Skip to content

Commit a198bad

Browse files
committed
Delay implementation in Swing, JavaFx, and scheduled executors is fixed to avoid an extra dispatch
CancellableContinuation.resumeUndispatched is introduced to make this efficient implementation possible Additional documentation for CoroutineDispatcher implementors
1 parent 2fd7cb3 commit a198bad

File tree

10 files changed

+214
-47
lines changed

10 files changed

+214
-47
lines changed

kotlinx-coroutines-core/pom.xml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,17 @@
4545
<argLine>-ea -Xmx1g -Xms1g</argLine>
4646
</configuration>
4747
</plugin>
48+
<plugin>
49+
<groupId>org.apache.maven.plugins</groupId>
50+
<artifactId>maven-jar-plugin</artifactId>
51+
<executions>
52+
<execution>
53+
<goals>
54+
<goal>test-jar</goal>
55+
</goals>
56+
</execution>
57+
</executions>
58+
</plugin>
4859
</plugins>
4960
</build>
5061
</project>

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt

Lines changed: 55 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -49,32 +49,54 @@ public interface CancellableContinuation<in T> : Continuation<T>, Job {
4949
*
5050
* It implies that [isActive] is `false` and [isCompleted] is `true`.
5151
*/
52-
val isCancelled: Boolean
52+
public val isCancelled: Boolean
5353

5454
/**
5555
* Tries to resume this continuation with a given value and returns non-null object token if it was successful,
5656
* or `null` otherwise (it was already resumed or cancelled). When non-null object was returned,
5757
* [completeResume] must be invoked with it.
5858
*/
59-
fun tryResume(value: T): Any?
59+
public fun tryResume(value: T): Any?
6060

6161
/**
6262
* Tries to resume this continuation with a given exception and returns non-null object token if it was successful,
6363
* or `null` otherwise (it was already resumed or cancelled). When non-null object was returned,
6464
* [completeResume] must be invoked with it.
6565
*/
66-
fun tryResumeWithException(exception: Throwable): Any?
66+
public fun tryResumeWithException(exception: Throwable): Any?
6767

6868
/**
6969
* Completes the execution of [tryResume] or [tryResumeWithException] on its non-null result.
7070
*/
71-
fun completeResume(token: Any)
71+
public fun completeResume(token: Any)
7272

7373
/**
7474
* Makes this continuation cancellable. Use it with `holdCancellability` optional parameter to
7575
* [suspendCancellableCoroutine] function. It throws [IllegalStateException] if invoked more than once.
7676
*/
77-
fun initCancellability()
77+
public fun initCancellability()
78+
79+
/**
80+
* Resumes this continuation with a given [value] in the invoker thread without going though
81+
* [dispatch][CoroutineDispatcher.dispatch] function of the [CoroutineDispatcher] in the [context].
82+
* This function is designed to be used only by the [CoroutineDispatcher] implementations themselves.
83+
* **It should not be used in general code**.
84+
*
85+
* The receiver [CoroutineDispatcher] of this function be equal to the context dispatcher or
86+
* [IllegalArgumentException] if thrown.
87+
*/
88+
public fun CoroutineDispatcher.resumeUndispatched(value: T)
89+
90+
/**
91+
* Resumes this continuation with a given [exception] in the invoker thread without going though
92+
* [dispatch][CoroutineDispatcher.dispatch] function of the [CoroutineDispatcher] in the [context].
93+
* This function is designed to be used only by the [CoroutineDispatcher] implementations themselves.
94+
* **It should not be used in general code**.
95+
*
96+
* The receiver [CoroutineDispatcher] of this function be equal to the context dispatcher or
97+
* [IllegalArgumentException] if thrown.
98+
*/
99+
public fun CoroutineDispatcher.resumeUndispatchedWithException(exception: Throwable)
78100
}
79101

80102
/**
@@ -123,6 +145,7 @@ internal class SafeCancellableContinuation<in T>(
123145
const val SUSPENDED = 1
124146
const val RESUMED = 2
125147
const val YIELD = 3 // used by cancellable "yield"
148+
const val UNDISPATCHED = 4 // used by "undispatchedXXX"
126149
}
127150

128151
override fun initCancellability() {
@@ -173,12 +196,21 @@ internal class SafeCancellableContinuation<in T>(
173196
val decision = this.decision // volatile read
174197
if (decision == UNDECIDED && DECISION.compareAndSet(this, UNDECIDED, RESUMED)) return // will get result in getResult
175198
// otherwise, getResult has already commenced, i.e. it was resumed later or in other thread
199+
when {
200+
decision == UNDISPATCHED -> undispatchedCompletion(state)
201+
state is CompletedExceptionally -> delegate.resumeWithException(state.exception)
202+
decision == YIELD && delegate is DispatchedContinuation -> delegate.resumeYield(parentJob, state as T)
203+
else -> delegate.resume(state as T)
204+
}
205+
}
206+
207+
@Suppress("UNCHECKED_CAST")
208+
private fun undispatchedCompletion(state: Any?) {
209+
delegate as DispatchedContinuation // type assertion -- was checked in resumeUndispatched
176210
if (state is CompletedExceptionally)
177-
delegate.resumeWithException(state.exception)
178-
else if (decision == YIELD && delegate is DispatchedContinuation)
179-
delegate.resumeYield(parentJob, state as T)
211+
delegate.resumeUndispatchedWithException(state.exception)
180212
else
181-
delegate.resume(state as T)
213+
delegate.resumeUndispatched(state as T)
182214
}
183215

184216
// can only be invoked in the same thread as getResult (see "yield"), afterCompletion may be concurrent
@@ -187,4 +219,18 @@ internal class SafeCancellableContinuation<in T>(
187219
DECISION.compareAndSet(this, UNDECIDED, YIELD) // try mark as needing dispatch
188220
resume(value)
189221
}
222+
223+
override fun CoroutineDispatcher.resumeUndispatched(value: T) {
224+
val dc = delegate as? DispatchedContinuation ?: throw IllegalArgumentException("Must be used with DispatchedContinuation")
225+
check(dc.dispatcher === this) { "Must be invoked from the context CoroutineDispatcher"}
226+
DECISION.compareAndSet(this@SafeCancellableContinuation, SUSPENDED, UNDISPATCHED)
227+
resume(value)
228+
}
229+
230+
override fun CoroutineDispatcher.resumeUndispatchedWithException(exception: Throwable) {
231+
val dc = delegate as? DispatchedContinuation ?: throw IllegalArgumentException("Must be used with DispatchedContinuation")
232+
check(dc.dispatcher === this) { "Must be invoked from the context CoroutineDispatcher"}
233+
DECISION.compareAndSet(this@SafeCancellableContinuation, SUSPENDED, UNDISPATCHED)
234+
resumeWithException(exception)
235+
}
190236
}

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineDispatcher.kt

Lines changed: 42 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,31 @@ import kotlin.coroutines.experimental.CoroutineContext
4141
public abstract class CoroutineDispatcher :
4242
AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
4343
/**
44-
* Return `true` if execution shall be dispatched onto another thread.
44+
* Returns `true` if execution shall be dispatched onto another thread.
4545
* The default behaviour for most dispatchers is to return `true`.
46+
*
47+
* UI dispatchers _should not_ override `isDispatchNeeded`, but leave a default implementation that
48+
* returns `true`. To understand the rationale beyond this recommendation, consider the following code:
49+
*
50+
* ```kotlin
51+
* fun asyncUpdateUI() = async(MainThread) {
52+
* // do something here that updates something in UI
53+
* }
54+
* ```
55+
*
56+
* When you invoke `asyncUpdateUI` in some background thread, it immediately continues to the next
57+
* line, while UI update happens asynchronously in the UI thread. However, if you invoke
58+
* it in the UI thread itself, it updates UI _synchronously_ if your `isDispatchNeeded` is
59+
* overridden with a thread check. Checking if we are already in the UI thread seems more
60+
* efficient (and it might indeed save a few CPU cycles), but this subtle and context-sensitive
61+
* difference in behavior makes the resulting async code harder to debug.
62+
*
63+
* Basically, the choice here is between "JS-style" asynchronous approach (async actions
64+
* are always postponed to be executed later in the even dispatch thread) and "C#-style" approach
65+
* (async actions are executed in the invoker thread until the first suspension point).
66+
* While, C# approach seems to be more efficient, it ends up with recommendations like
67+
* "use `yield` if you need to ....". This is error-prone. JS-style approach is more consistent
68+
* and does not require programmers to think about whether they need to yield or not.
4669
*/
4770
public open fun isDispatchNeeded(context: CoroutineContext): Boolean = true
4871

@@ -78,28 +101,34 @@ internal class DispatchedContinuation<in T>(
78101
val context = continuation.context
79102
if (dispatcher.isDispatchNeeded(context))
80103
dispatcher.dispatch(context, Runnable {
81-
withCoroutineContext(context) {
82-
continuation.resume(value)
83-
}
104+
resumeUndispatched(value)
84105
})
85106
else
86-
withCoroutineContext(context) {
87-
continuation.resume(value)
88-
}
107+
resumeUndispatched(value)
108+
}
109+
110+
@Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack
111+
inline fun resumeUndispatched(value: T) {
112+
withCoroutineContext(context) {
113+
continuation.resume(value)
114+
}
89115
}
90116

91117
override fun resumeWithException(exception: Throwable) {
92118
val context = continuation.context
93119
if (dispatcher.isDispatchNeeded(context))
94120
dispatcher.dispatch(context, Runnable {
95-
withCoroutineContext(context) {
96-
continuation.resumeWithException(exception)
97-
}
121+
resumeUndispatchedWithException(exception)
98122
})
99123
else
100-
withCoroutineContext(context) {
101-
continuation.resumeWithException(exception)
102-
}
124+
resumeUndispatchedWithException(exception)
125+
}
126+
127+
@Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack
128+
inline fun resumeUndispatchedWithException(exception: Throwable) {
129+
withCoroutineContext(context) {
130+
continuation.resumeWithException(exception)
131+
}
103132
}
104133

105134
// used by "yield" implementation

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Delay.kt

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,18 @@ public interface Delay {
3838

3939
/**
4040
* Schedules resume of a specified [continuation] after a specified delay [time].
41+
*
42+
* Continuation **must be scheduled** to resume even if it is already cancelled, because a cancellation is just
43+
* an exception that the coroutine that used `delay` might wanted to catch and process. It might
44+
* need to close some resources in its `finally` blocks, for example.
45+
*
46+
* This implementation is supposed to use dispatcher's native ability for scheduled execution in its thread(s).
47+
* In order to avoid an extra delay of execution, the following code shall be used to resume this
48+
* [continuation] when the code is already executing in the appropriate thread:
49+
*
50+
* ```kotlin
51+
* with(continuation) { resumeUndispatched(Unit) }
52+
* ```
4153
*/
4254
fun scheduleResumeAfterDelay(time: Long, unit: TimeUnit, continuation: CancellableContinuation<Unit>)
4355
}
@@ -59,6 +71,7 @@ suspend fun delay(time: Long, unit: TimeUnit = TimeUnit.MILLISECONDS) {
5971
scheduleResumeAfterDelay(time, unit, cont)
6072
return@sc
6173
}
62-
scheduledExecutor.scheduleResumeAfterDelay(time, unit, cont)
74+
val timeout = scheduledExecutor.schedule(ResumeRunnable(cont), time, unit)
75+
cont.cancelFutureOnCompletion(timeout)
6376
}
6477
}

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Executors.kt

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@
1717
package kotlinx.coroutines.experimental
1818

1919
import java.util.concurrent.Executor
20-
import java.util.concurrent.ExecutorService
2120
import java.util.concurrent.ScheduledExecutorService
2221
import java.util.concurrent.TimeUnit
22+
import kotlin.coroutines.experimental.Continuation
2323
import kotlin.coroutines.experimental.CoroutineContext
2424

2525
/**
@@ -32,16 +32,26 @@ internal open class ExecutorCoroutineDispatcher(val executor: Executor) : Corout
3232
override fun dispatch(context: CoroutineContext, block: Runnable) = executor.execute(block)
3333

3434
override fun scheduleResumeAfterDelay(time: Long, unit: TimeUnit, continuation: CancellableContinuation<Unit>) {
35-
(executor as? ScheduledExecutorService ?: scheduledExecutor).scheduleResumeAfterDelay(time, unit, continuation)
35+
val timeout = if (executor is ScheduledExecutorService)
36+
executor.schedule(ResumeUndispatchedRunnable(this, continuation), time, unit) else
37+
scheduledExecutor.schedule(ResumeRunnable(continuation), time, unit)
38+
continuation.cancelFutureOnCompletion(timeout)
3639
}
3740
}
3841

39-
internal fun ExecutorService.scheduleResume(cont: CancellableContinuation<Unit>) {
40-
val future = submit { cont.resume(Unit) }
41-
cont.cancelFutureOnCompletion(future)
42+
// --- reusing these classes in other places ---
43+
44+
internal class ResumeUndispatchedRunnable(
45+
val dispatcher: CoroutineDispatcher,
46+
val continuation: CancellableContinuation<Unit>
47+
) : Runnable {
48+
override fun run() {
49+
with(continuation) { dispatcher.resumeUndispatched(Unit) }
50+
}
4251
}
4352

44-
internal fun ScheduledExecutorService.scheduleResumeAfterDelay(time: Long, unit: TimeUnit, cont: CancellableContinuation<Unit>) {
45-
val timeout = schedule({ cont.resume(Unit) }, time, unit)
46-
cont.cancelFutureOnCompletion(timeout)
53+
internal class ResumeRunnable(val continuation: Continuation<Unit>) : Runnable {
54+
override fun run() {
55+
continuation.resume(Unit)
56+
}
4757
}

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/ThreadPoolDispatcher.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ private class ThreadPoolDispatcher(
6262
override fun dispatch(context: CoroutineContext, block: Runnable) = executor.execute(block)
6363

6464
override fun scheduleResumeAfterDelay(time: Long, unit: TimeUnit, continuation: CancellableContinuation<Unit>) {
65-
executor.scheduleResumeAfterDelay(time, unit, continuation)
65+
val timeout = executor.schedule(ResumeUndispatchedRunnable(this, continuation), time, unit)
66+
continuation.cancelFutureOnCompletion(timeout)
6667
}
6768
}

kotlinx-coroutines-javafx/src/main/kotlin/kotlinx/coroutines/experimental/javafx/JavaFx.kt

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -50,24 +50,28 @@ object JavaFx : CoroutineDispatcher(), Delay {
5050
}
5151

5252
override fun scheduleResumeAfterDelay(time: Long, unit: TimeUnit, continuation: CancellableContinuation<Unit>) {
53-
val timeline = Timeline(KeyFrame(Duration.millis(unit.toMillis(time).toDouble()),
54-
EventHandler<ActionEvent> { continuation.resume(Unit) }))
53+
val handler = EventHandler<ActionEvent> {
54+
with(continuation) { resumeUndispatched(Unit) }
55+
}
56+
val timeline = Timeline(KeyFrame(Duration.millis(unit.toMillis(time).toDouble()), handler))
5557
timeline.play()
5658
continuation.onCompletion { timeline.stop() }
5759
}
58-
}
5960

60-
private class PulseTimer : AnimationTimer() {
61-
val next = CopyOnWriteArrayList<CancellableContinuation<Long>>()
61+
private class PulseTimer : AnimationTimer() {
62+
val next = CopyOnWriteArrayList<CancellableContinuation<Long>>()
6263

63-
override fun handle(now: Long) {
64-
val cur = next.toTypedArray()
65-
next.clear()
66-
for (cont in cur)
67-
cont.resume(now)
68-
}
64+
override fun handle(now: Long) {
65+
val cur = next.toTypedArray()
66+
next.clear()
67+
for (cont in cur)
68+
with (cont) { resumeUndispatched(now) }
69+
}
6970

70-
fun onNext(cont: CancellableContinuation<Long>) {
71-
next += cont
71+
fun onNext(cont: CancellableContinuation<Long>) {
72+
next += cont
73+
}
7274
}
75+
76+
override fun toString() = "JavaFx"
7377
}

kotlinx-coroutines-swing/pom.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,13 @@
4040
<version>${project.version}</version>
4141
<scope>compile</scope>
4242
</dependency>
43+
<dependency>
44+
<groupId>org.jetbrains.kotlinx</groupId>
45+
<artifactId>kotlinx-coroutines-core</artifactId>
46+
<version>${project.version}</version>
47+
<classifier>tests</classifier>
48+
<scope>test</scope>
49+
</dependency>
4350
<dependency>
4451
<groupId>org.jetbrains.kotlinx</groupId>
4552
<artifactId>kotlinx-coroutines-jdk8</artifactId>

kotlinx-coroutines-swing/src/main/kotlin/kotlinx/coroutines/experimental/swing/Swing.kt

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,15 +30,19 @@ import kotlin.coroutines.experimental.CoroutineContext
3030
* Dispatches execution onto Swing event dispatching thread and provides native [delay] support.
3131
*/
3232
object Swing : CoroutineDispatcher(), Delay {
33-
override fun isDispatchNeeded(context: CoroutineContext): Boolean = !SwingUtilities.isEventDispatchThread()
3433
override fun dispatch(context: CoroutineContext, block: Runnable) = SwingUtilities.invokeLater(block)
3534

3635
override fun scheduleResumeAfterDelay(time: Long, unit: TimeUnit, continuation: CancellableContinuation<Unit>) {
3736
val timerTime = unit.toMillis(time).coerceAtMost(Int.MAX_VALUE.toLong()).toInt()
38-
val timer = Timer(timerTime, ActionListener { continuation.resume(Unit) }).apply {
37+
val action = ActionListener {
38+
with(continuation) { resumeUndispatched(Unit) }
39+
}
40+
val timer = Timer(timerTime, action).apply {
3941
isRepeats = false
4042
start()
4143
}
4244
continuation.onCompletion { timer.stop() }
4345
}
46+
47+
override fun toString() = "Swing"
4448
}

0 commit comments

Comments
 (0)