1616
1717package kotlinx.coroutines.experimental
1818
19- import kotlin.browser.*
2019import kotlin.coroutines.experimental.*
20+ import org.w3c.dom.*
2121
22- internal object JSDispatcher : CoroutineDispatcher(), Delay {
23- // Check if we are in the browser and must use postMessage to avoid setTimeout throttling
24- private val messageQueue =
25- if (jsTypeOf(window) != " undefined" ) MessageQueue ().apply { register() } else null
26-
22+ internal class NodeDispatcher : CoroutineDispatcher (), Delay {
2723 override fun dispatch (context : CoroutineContext , block : Runnable ) {
28- if (messageQueue != null ) {
29- messageQueue.enqueue(block)
30- } else {
31- setTimeout({ block.run () }, 0 )
32- }
24+ setTimeout({ block.run () }, 0 )
3325 }
3426
3527 override fun scheduleResumeAfterDelay (time : Int , continuation : CancellableContinuation <Unit >) {
@@ -46,60 +38,105 @@ internal object JSDispatcher : CoroutineDispatcher(), Delay {
4638 }
4739}
4840
49- // it is open for tests
50- internal open class MessageQueue {
51- val yieldEvery = 16 // yield to JS event loop after this many processed messages
52-
53- private val messageName = " JSDispatcher.dispatch"
54- private var scheduled = false
41+ internal class WindowDispatcher (private val window : Window ) : CoroutineDispatcher(), Delay {
42+ private val messageName = " dispatchCoroutine"
5543
56- private var queue = arrayOfNulls<Runnable >(8 )
57- private var head = 0
58- private var tail = 0
44+ private val queue = object : MessageQueue () {
45+ override fun schedule () {
46+ window.postMessage(messageName, " *" )
47+ }
48+ }
5949
60- fun register () {
50+ init {
6151 window.addEventListener(" message" , { event: dynamic ->
6252 if (event.source == window && event.data == messageName) {
6353 event.stopPropagation()
64- process()
54+ queue. process()
6555 }
6656 }, true )
6757 }
6858
69- // it is open for tests
70- open fun schedule () {
71- window.postMessage(messageName, " *" )
59+ override fun dispatch (context : CoroutineContext , block : Runnable ) {
60+ queue.enqueue(block)
61+ }
62+
63+ override fun scheduleResumeAfterDelay (time : Int , continuation : CancellableContinuation <Unit >) {
64+ window.setTimeout({ with (continuation) { resumeUndispatched(Unit ) } }, time.coerceAtLeast(0 ))
65+ }
66+
67+ override fun invokeOnTimeout (time : Int , block : Runnable ): DisposableHandle {
68+ val handle = window.setTimeout({ block.run () }, time.coerceAtLeast(0 ))
69+ return object : DisposableHandle {
70+ override fun dispose () {
71+ window.clearTimeout(handle)
72+ }
73+ }
74+ }
75+ }
76+
77+ internal abstract class MessageQueue : Queue <Runnable >() {
78+ val yieldEvery = 16 // yield to JS event loop after this many processed messages
79+
80+ private var scheduled = false
81+
82+ abstract fun schedule ()
83+
84+ fun enqueue (element : Runnable ) {
85+ add(element)
86+ if (! scheduled) {
87+ scheduled = true
88+ schedule()
89+ }
7290 }
7391
92+ fun process () {
93+ try {
94+ // limit number of processed messages
95+ repeat(yieldEvery) {
96+ val element = poll() ? : return @process
97+ element.run ()
98+ }
99+ } finally {
100+ if (isEmpty) {
101+ scheduled = false
102+ } else {
103+ schedule()
104+ }
105+ }
106+ }
107+ }
108+
109+ internal open class Queue <T : Any > {
110+ private var queue = arrayOfNulls<Any ?>(8 )
111+ private var head = 0
112+ private var tail = 0
113+
74114 val isEmpty get() = head == tail
75115
76- fun poll (): Runnable ? {
116+ fun poll (): T ? {
77117 if (isEmpty) return null
78118 val result = queue[head]!!
79119 queue[head] = null
80120 head = head.next()
81- return result
121+ @Suppress(" UNCHECKED_CAST" )
122+ return result as T
82123 }
83124
84- tailrec fun enqueue ( block : Runnable ) {
125+ tailrec fun add ( element : T ) {
85126 val newTail = tail.next()
86127 if (newTail == head) {
87128 resize()
88- enqueue(block ) // retry with larger size
129+ add(element ) // retry with larger size
89130 return
90131 }
91- queue[tail] = block
132+ queue[tail] = element
92133 tail = newTail
93- if (! scheduled) {
94- scheduled = true
95- schedule()
96- }
97134 }
98135
99- fun resize () {
136+ private fun resize () {
100137 var i = head
101138 var j = 0
102- val a = arrayOfNulls<Runnable >(queue.size * 2 )
139+ val a = arrayOfNulls<Any ? >(queue.size * 2 )
103140 while (i != tail) {
104141 a[j++ ] = queue[i]
105142 i = i.next()
@@ -113,22 +150,6 @@ internal open class MessageQueue {
113150 val j = this + 1
114151 return if (j == queue.size) 0 else j
115152 }
116-
117- fun process () {
118- try {
119- // limit number of processed messages
120- repeat(yieldEvery) {
121- val block = poll() ? : return @process
122- block.run ()
123- }
124- } finally {
125- if (isEmpty) {
126- scheduled = false
127- } else {
128- schedule()
129- }
130- }
131- }
132153}
133154
134155// We need to reference global setTimeout and clearTimeout so that it works on Node.JS as opposed to
0 commit comments