1616
1717package kotlinx.coroutines.experimental
1818
19- import kotlinx.coroutines.experimental.internal.LockFreeLinkedListHead
20- import kotlinx.coroutines.experimental.internal.LockFreeLinkedListNode
21- import kotlinx.coroutines.experimental.internal.ThreadSafeHeap
22- import kotlinx.coroutines.experimental.internal.ThreadSafeHeapNode
23- import java.util.concurrent.TimeUnit
24- import java.util.concurrent.locks.LockSupport
25- import kotlin.coroutines.experimental.CoroutineContext
19+ import kotlinx.atomicfu.*
20+ import kotlinx.coroutines.experimental.internal.*
21+ import java.util.concurrent.*
22+ import java.util.concurrent.locks.*
23+ import kotlin.coroutines.experimental.*
2624
2725/* *
2826 * Implemented by [CoroutineDispatcher] implementations that have event loop inside and can
@@ -67,6 +65,7 @@ public interface EventLoop {
6765 * }
6866 * ```
6967 */
68+ @Suppress(" FunctionName" )
7069public fun EventLoop (thread : Thread = Thread .currentThread(), parentJob : Job ? = null): CoroutineDispatcher =
7170 EventLoopImpl (thread).apply {
7271 if (parentJob != null ) initParentJob(parentJob)
@@ -76,30 +75,49 @@ private const val DELAYED = 0
7675private const val REMOVED = 1
7776private const val RESCHEDULED = 2
7877
78+ @Suppress(" PrivatePropertyName" )
79+ private val CLOSED_EMPTY = Symbol (" CLOSED_EMPTY" )
80+
81+ private typealias Queue <T > = LockFreeMPSCQueueCore <T >
82+
7983internal abstract class EventLoopBase : CoroutineDispatcher (), Delay, EventLoop {
80- private val queue = LockFreeLinkedListHead ()
81- private val delayed = ThreadSafeHeap <DelayedTask >()
84+ // null | CLOSED_EMPTY | task | Queue<Runnable>
85+ private val _queue = atomic<Any ?>(null )
86+
87+ // Allocated only only once
88+ private val _delayed = atomic<ThreadSafeHeap <DelayedTask >? > (null )
8289
83- protected abstract val canComplete: Boolean
8490 protected abstract val isCompleted: Boolean
8591 protected abstract fun unpark ()
8692 protected abstract fun isCorrectThread (): Boolean
8793
8894 protected val isEmpty: Boolean
89- get() = queue.isEmpty && delayed.isEmpty
95+ get() = isQueueEmpty && isDelayedEmpty
96+
97+ private val isQueueEmpty: Boolean get() {
98+ val queue = _queue .value
99+ return when (queue) {
100+ null -> true
101+ is Queue <* > -> queue.isEmpty
102+ else -> queue == = CLOSED_EMPTY
103+ }
104+ }
105+
106+ private val isDelayedEmpty: Boolean get() {
107+ val delayed = _delayed .value
108+ return delayed == null || delayed.isEmpty
109+ }
90110
91111 private val nextTime: Long
92112 get() {
93- if (! queue.isEmpty) return 0
113+ if (! isQueueEmpty) return 0
114+ val delayed = _delayed .value ? : return Long .MAX_VALUE
94115 val nextDelayedTask = delayed.peek() ? : return Long .MAX_VALUE
95116 return (nextDelayedTask.nanoTime - timeSource.nanoTime()).coerceAtLeast(0 )
96117 }
97118
98- fun execute (block : Runnable ) =
99- enqueue(block.toQueuedTask())
100-
101119 override fun dispatch (context : CoroutineContext , block : Runnable ) =
102- enqueue (block.toQueuedTask() )
120+ execute (block)
103121
104122 override fun scheduleResumeAfterDelay (time : Long , unit : TimeUnit , continuation : CancellableContinuation <Unit >) =
105123 schedule(DelayedResumeTask (time, unit, continuation))
@@ -110,43 +128,101 @@ internal abstract class EventLoopBase: CoroutineDispatcher(), Delay, EventLoop {
110128 override fun processNextEvent (): Long {
111129 if (! isCorrectThread()) return Long .MAX_VALUE
112130 // queue all delayed tasks that are due to be executed
113- if (! delayed.isEmpty) {
131+ val delayed = _delayed .value
132+ if (delayed != null && ! delayed.isEmpty) {
114133 val now = timeSource.nanoTime()
115134 while (true ) {
116135 // make sure that moving from delayed to queue removes from delayed only after it is added to queue
117136 // to make sure that 'isEmpty' and `nextTime` that check both of them
118137 // do not transiently report that both delayed and queue are empty during move
119138 delayed.removeFirstIf {
120139 if (it.timeToExecute(now)) {
121- queue.addLast(it)
122- true // proceed with remove
140+ enqueueImpl(it)
123141 } else
124142 false
125- } ? : break // quit loop when nothing more to remove
143+ } ? : break // quit loop when nothing more to remove or enqueueImpl returns false on "isComplete"
126144 }
127145 }
128146 // then process one event from queue
129- (queue.removeFirstOrNull() as ? QueuedTask )?.run ()
147+ dequeue( )?.run ()
130148 return nextTime
131149 }
132150
133- private fun Runnable.toQueuedTask (): QueuedTask =
134- if (this is QueuedTask && isFresh) this else QueuedRunnableTask (this )
135-
136- internal fun enqueue (queuedTask : QueuedTask ) {
137- if (enqueueImpl(queuedTask)) {
151+ @Suppress(" MemberVisibilityCanBePrivate" ) // todo: remove suppress when KT-22030 is fixed
152+ internal fun execute (task : Runnable ) {
153+ if (enqueueImpl(task)) {
138154 // todo: we should unpark only when this delayed task became first in the queue
139155 unpark()
140156 } else
141- DefaultExecutor .enqueue(queuedTask)
157+ DefaultExecutor .execute(task)
158+ }
159+
160+ @Suppress(" UNCHECKED_CAST" )
161+ private fun enqueueImpl (task : Runnable ): Boolean {
162+ _queue .loop { queue ->
163+ if (isCompleted) return false // fail fast if already completed, may still add, but queues will close
164+ when (queue) {
165+ null -> if (_queue .compareAndSet(null , task)) return true
166+ is Queue <* > -> {
167+ when ((queue as Queue <Runnable >).addLast(task)) {
168+ Queue .ADD_SUCCESS -> return true
169+ Queue .ADD_CLOSED -> return false
170+ Queue .ADD_FROZEN -> _queue .compareAndSet(queue, queue.next())
171+ }
172+ }
173+ else -> when {
174+ queue == = CLOSED_EMPTY -> return false
175+ else -> {
176+ // update to full-blown queue to add one more
177+ val newQueue = Queue <Runnable >(Queue .INITIAL_CAPACITY )
178+ newQueue.addLast(queue as Runnable )
179+ newQueue.addLast(task)
180+ if (_queue .compareAndSet(queue, newQueue)) return true
181+ }
182+ }
183+ }
184+ }
142185 }
143186
144- private fun enqueueImpl (queuedTask : QueuedTask ): Boolean {
145- if (! canComplete) {
146- queue.addLast(queuedTask)
147- return true
187+ @Suppress(" UNCHECKED_CAST" )
188+ private fun dequeue (): Runnable ? {
189+ _queue .loop { queue ->
190+ when (queue) {
191+ null -> return null
192+ is Queue <* > -> {
193+ val result = (queue as Queue <Runnable >).removeFirstOrNull()
194+ if (result != = Queue .REMOVE_FROZEN ) return result as Runnable ?
195+ _queue .compareAndSet(queue, queue.next())
196+ }
197+ else -> when {
198+ queue == = CLOSED_EMPTY -> return null
199+ else -> if (_queue .compareAndSet(queue, null )) return queue as Runnable
200+ }
201+ }
148202 }
149- return queue.addLastIf(queuedTask) { ! isCompleted }
203+ }
204+
205+ protected fun closeQueue () {
206+ assert (isCompleted)
207+ _queue .loop { queue ->
208+ when (queue) {
209+ null -> if (_queue .compareAndSet(null , CLOSED_EMPTY )) return
210+ is Queue <* > -> {
211+ queue.close()
212+ return
213+ }
214+ else -> when {
215+ queue == = CLOSED_EMPTY -> return
216+ else -> {
217+ // update to full-blown queue to close
218+ val newQueue = Queue <Runnable >(Queue .INITIAL_CAPACITY )
219+ newQueue.addLast(queue as Runnable )
220+ if (_queue .compareAndSet(queue, newQueue)) return
221+ }
222+ }
223+ }
224+ }
225+
150226 }
151227
152228 internal fun schedule (delayedTask : DelayedTask ) {
@@ -158,43 +234,37 @@ internal abstract class EventLoopBase: CoroutineDispatcher(), Delay, EventLoop {
158234 }
159235
160236 private fun scheduleImpl (delayedTask : DelayedTask ): Boolean {
161- if (! canComplete) {
162- delayed.addLast(delayedTask)
163- return true
237+ if (isCompleted) return false
238+ val delayed = _delayed .value ? : run {
239+ _delayed .compareAndSet(null , ThreadSafeHeap ())
240+ _delayed .value!!
164241 }
165242 return delayed.addLastIf(delayedTask) { ! isCompleted }
166243 }
167244
168245 internal fun removeDelayedImpl (delayedTask : DelayedTask ) {
169- delayed .remove(delayedTask)
246+ _delayed .value? .remove(delayedTask)
170247 }
171248
172- protected fun clearAll () {
173- while (true ) queue.removeFirstOrNull() ? : break
174- while (true ) delayed.removeFirstOrNull() ? : break
249+ // It performs "hard" shutdown for test cleanup purposes
250+ protected fun resetAll () {
251+ _queue .value = null
252+ _delayed .value = null
175253 }
176254
255+ // This is a "soft" (normal) shutdown
177256 protected fun rescheduleAllDelayed () {
178257 while (true ) {
179- val delayedTask = delayed .removeFirstOrNull() ? : break
258+ val delayedTask = _delayed .value? .removeFirstOrNull() ? : break
180259 delayedTask.rescheduleOnShutdown()
181260 }
182261 }
183262
184- internal abstract class QueuedTask : LockFreeLinkedListNode (), Runnable
185-
186- private class QueuedRunnableTask (
187- private val block : Runnable
188- ) : QueuedTask() {
189- override fun run () { block.run () }
190- override fun toString (): String = block.toString()
191- }
192-
193263 internal abstract inner class DelayedTask (
194264 time : Long , timeUnit : TimeUnit
195- ) : QueuedTask() , Comparable<DelayedTask>, DisposableHandle, ThreadSafeHeapNode {
265+ ) : Runnable , Comparable<DelayedTask>, DisposableHandle, ThreadSafeHeapNode {
196266 override var index: Int = - 1
197- var state = DELAYED
267+ var state = DELAYED // Guarded by by lock on this task for reschedule/dispose purposes
198268 @JvmField val nanoTime: Long = timeSource.nanoTime() + timeUnit.toNanos(time)
199269
200270 override fun compareTo (other : DelayedTask ): Int {
@@ -208,18 +278,18 @@ internal abstract class EventLoopBase: CoroutineDispatcher(), Delay, EventLoop {
208278
209279 fun timeToExecute (now : Long ): Boolean = now - nanoTime >= 0L
210280
211- fun rescheduleOnShutdown () = synchronized(delayed ) {
281+ fun rescheduleOnShutdown () = synchronized(this ) {
212282 if (state != DELAYED ) return
213- if (delayed .remove(this )) {
283+ if (_delayed .value !! .remove(this )) {
214284 state = RESCHEDULED
215285 DefaultExecutor .schedule(this )
216286 } else
217287 state = REMOVED
218288 }
219289
220- override final fun dispose () = synchronized(delayed ) {
290+ final override fun dispose () = synchronized(this ) {
221291 when (state) {
222- DELAYED -> delayed .remove(this )
292+ DELAYED -> _delayed .value? .remove(this )
223293 RESCHEDULED -> DefaultExecutor .removeDelayedImpl(this )
224294 else -> return
225295 }
@@ -258,7 +328,7 @@ internal abstract class ThreadEventLoop(
258328 }
259329
260330 fun shutdown () {
261- assert (isCompleted )
331+ closeQueue( )
262332 assert (isCorrectThread())
263333 // complete processing of all queued tasks
264334 while (processNextEvent() <= 0 ) { /* spin */ }
@@ -270,7 +340,6 @@ internal abstract class ThreadEventLoop(
270340private class EventLoopImpl (thread : Thread ) : ThreadEventLoop(thread) {
271341 private var parentJob: Job ? = null
272342
273- override val canComplete: Boolean get() = parentJob != null
274343 override val isCompleted: Boolean get() = parentJob?.isCompleted == true
275344
276345 fun initParentJob (parentJob : Job ) {
@@ -280,7 +349,6 @@ private class EventLoopImpl(thread: Thread) : ThreadEventLoop(thread) {
280349}
281350
282351internal class BlockingEventLoop (thread : Thread ) : ThreadEventLoop(thread) {
283- override val canComplete: Boolean get() = true
284352 @Volatile
285353 public override var isCompleted: Boolean = false
286354}
0 commit comments