@@ -12,70 +12,64 @@ import kotlin.jvm.*
1212@SharedImmutable
1313private val UNDEFINED = Symbol (" UNDEFINED" )
1414
15- @NativeThreadLocal
16- internal object UndispatchedEventLoop {
17- data class EventLoop (
18- @JvmField var isActive : Boolean = false ,
19- @JvmField val queue : ArrayQueue <Runnable > = ArrayQueue ()
20- )
21-
22- @JvmField
23- internal val threadLocalEventLoop = CommonThreadLocal { EventLoop () }
24-
25- /* *
26- * Executes given [block] as part of current event loop, updating related to block [continuation]
27- * mode and state if continuation is not resumed immediately.
28- * [doYield] indicates whether current continuation is yielding (to provide fast-path if event-loop is empty).
29- * Returns `true` if execution of continuation was queued (trampolined) or `false` otherwise.
30- */
31- inline fun execute (continuation : DispatchedContinuation <* >, contState : Any? , mode : Int ,
32- doYield : Boolean = false, block : () -> Unit ) : Boolean {
33- val eventLoop = threadLocalEventLoop.get()
34- if (eventLoop.isActive) {
35- // If we are yielding and queue is empty, we can bail out as part of fast path
36- if (doYield && eventLoop.queue.isEmpty) {
37- return false
38- }
39-
40- continuation._state = contState
41- continuation.resumeMode = mode
42- eventLoop.queue.addLast(continuation)
43- return true
44- }
45-
46- runEventLoop(eventLoop, block)
47- return false
15+ /* *
16+ * Executes given [block] as part of current event loop, updating current continuation
17+ * mode and state if continuation is not resumed immediately.
18+ * [doYield] indicates whether current continuation is yielding (to provide fast-path if event-loop is empty).
19+ * Returns `true` if execution of continuation was queued (trampolined) or `false` otherwise.
20+ */
21+ private inline fun DispatchedContinuation <* >.executeUnconfined (
22+ contState : Any? , mode : Int , doYield : Boolean = false,
23+ block : () -> Unit
24+ ) : Boolean {
25+ val eventLoop = ThreadLocalEventLoop .eventLoop
26+ // If we are yielding and unconfined queue is empty, we can bail out as part of fast path
27+ if (doYield && eventLoop.isUnconfinedQueueEmpty) return false
28+ return if (eventLoop.isUnconfinedLoopActive) {
29+ // When unconfined loop is active -- dispatch continuation for execution to avoid stack overflow
30+ _state = contState
31+ resumeMode = mode
32+ eventLoop.dispatchUnconfined(this )
33+ true // queued into the active loop
34+ } else {
35+ // Was not active -- run event loop until all unconfined tasks are executed
36+ runUnconfinedEventLoop(eventLoop, block = block)
37+ false
4838 }
39+ }
4940
50- fun resumeUndispatched (task : DispatchedTask <* >): Boolean {
51- val eventLoop = threadLocalEventLoop.get()
52- if (eventLoop.isActive) {
53- eventLoop.queue.addLast(task)
54- return true
41+ private fun DispatchedTask <* >.resumeUnconfined () {
42+ val eventLoop = ThreadLocalEventLoop .eventLoop
43+ if (eventLoop.isUnconfinedLoopActive) {
44+ // When unconfined loop is active -- dispatch continuation for execution to avoid stack overflow
45+ eventLoop.dispatchUnconfined(this )
46+ } else {
47+ // Was not active -- run event loop until all unconfined tasks are executed
48+ runUnconfinedEventLoop(eventLoop) {
49+ resume(delegate, MODE_UNDISPATCHED )
5550 }
56-
57- runEventLoop(eventLoop, { task.resume(task.delegate, MODE_UNDISPATCHED ) })
58- return false
5951 }
52+ }
6053
61- inline fun runEventLoop (eventLoop : EventLoop , block : () -> Unit ) {
62- try {
63- eventLoop.isActive = true
64- block()
65- while (true ) {
66- val nextEvent = eventLoop.queue.removeFirstOrNull() ? : return
67- nextEvent.run ()
68- }
69- } catch (e: Throwable ) {
70- /*
71- * This exception doesn't happen normally, only if user either submitted throwing runnable
72- * or if we have a bug in implementation. Anyway, reset state of the dispatcher to the initial.
73- */
74- eventLoop.queue.clear()
75- throw DispatchException (" Unexpected exception in undispatched event loop, clearing pending tasks" , e)
76- } finally {
77- eventLoop.isActive = false
54+ private inline fun runUnconfinedEventLoop (
55+ eventLoop : EventLoop ,
56+ block : () -> Unit
57+ ) {
58+ eventLoop.incrementUseCount(unconfined = true )
59+ try {
60+ block()
61+ while (true ) {
62+ // break when all unconfined continuations where executed
63+ if (! eventLoop.processUnconfinedEvent()) break
7864 }
65+ } catch (e: Throwable ) {
66+ /*
67+ * This exception doesn't happen normally, only if user either submitted throwing runnable
68+ * or if we have a bug in implementation. Throw an exception that better explains the problem.
69+ */
70+ throw DispatchException (" Unexpected exception in unconfined event loop" , e)
71+ } finally {
72+ eventLoop.decrementUseCount(unconfined = true )
7973 }
8074}
8175
@@ -109,7 +103,7 @@ internal class DispatchedContinuation<in T>(
109103 resumeMode = MODE_ATOMIC_DEFAULT
110104 dispatcher.dispatch(context, this )
111105 } else {
112- UndispatchedEventLoop .execute( this , state, MODE_ATOMIC_DEFAULT ) {
106+ executeUnconfined( state, MODE_ATOMIC_DEFAULT ) {
113107 withCoroutineContext(this .context, countOrElement) {
114108 continuation.resumeWith(result)
115109 }
@@ -124,7 +118,7 @@ internal class DispatchedContinuation<in T>(
124118 resumeMode = MODE_CANCELLABLE
125119 dispatcher.dispatch(context, this )
126120 } else {
127- UndispatchedEventLoop .execute( this , value, MODE_CANCELLABLE ) {
121+ executeUnconfined( value, MODE_CANCELLABLE ) {
128122 if (! resumeCancelled()) {
129123 resumeUndispatched(value)
130124 }
@@ -141,7 +135,7 @@ internal class DispatchedContinuation<in T>(
141135 resumeMode = MODE_CANCELLABLE
142136 dispatcher.dispatch(context, this )
143137 } else {
144- UndispatchedEventLoop .execute( this , state, MODE_CANCELLABLE ) {
138+ executeUnconfined( state, MODE_CANCELLABLE ) {
145139 if (! resumeCancelled()) {
146140 resumeUndispatchedWithException(exception)
147141 }
@@ -207,7 +201,7 @@ internal fun <T> Continuation<T>.resumeDirectWithException(exception: Throwable)
207201}
208202
209203internal abstract class DispatchedTask <in T >(
210- @JvmField var resumeMode : Int
204+ @JvmField public var resumeMode : Int
211205) : SchedulerTask() {
212206 public abstract val delegate: Continuation <T >
213207
@@ -248,7 +242,7 @@ internal abstract class DispatchedTask<in T>(
248242}
249243
250244internal fun DispatchedContinuation<Unit>.yieldUndispatched (): Boolean =
251- UndispatchedEventLoop .execute( this , Unit , MODE_CANCELLABLE , doYield = true ) {
245+ executeUnconfined( Unit , MODE_CANCELLABLE , doYield = true ) {
252246 run ()
253247 }
254248
@@ -261,7 +255,7 @@ internal fun <T> DispatchedTask<T>.dispatch(mode: Int = MODE_CANCELLABLE) {
261255 if (dispatcher.isDispatchNeeded(context)) {
262256 dispatcher.dispatch(context, this )
263257 } else {
264- UndispatchedEventLoop .resumeUndispatched( this )
258+ resumeUnconfined( )
265259 }
266260 } else {
267261 resume(delegate, mode)
0 commit comments