1616
1717package kotlinx.coroutines.experimental
1818
19- import kotlinx.coroutines.experimental.intrinsics.startCoroutineUndispatched
2019import kotlinx.coroutines.experimental.selects.SelectBuilder
2120import kotlinx.coroutines.experimental.selects.select
2221import java.util.concurrent.ScheduledExecutorService
2322import java.util.concurrent.ScheduledThreadPoolExecutor
2423import java.util.concurrent.TimeUnit
2524import kotlin.coroutines.experimental.Continuation
2625import kotlin.coroutines.experimental.ContinuationInterceptor
26+ import kotlin.coroutines.experimental.CoroutineContext
27+ import kotlin.coroutines.experimental.intrinsics.startCoroutineUninterceptedOrReturn
2728import kotlin.coroutines.experimental.intrinsics.suspendCoroutineOrReturn
2829
2930private val KEEP_ALIVE = java.lang.Long .getLong(" kotlinx.coroutines.ScheduledExecutor.keepAlive" , 1000L )
@@ -68,9 +69,10 @@ internal fun scheduledExecutorShutdownNowAndRelease() {
6869 * Runs a given suspending [block] of code inside a coroutine with a specified timeout and throws
6970 * [CancellationException] if timeout was exceeded.
7071 *
71- * The code that is executing inside the [block] is cancelled on timeout and throws [CancellationException]
72- * exception inside of it, too. However, even the code in the block suppresses the exception,
73- * this `withTimeout` function invocation still throws [CancellationException].
72+ * The code that is executing inside the [block] is cancelled on timeout and the active or next invocation of
73+ * cancellable suspending function inside the block throws [CancellationException], so normally that exception,
74+ * if uncaught, also gets thrown by `withTimeout` as a result.
75+ * However, the code in the block can suppresses [CancellationException].
7476 *
7577 * The sibling function that does not throw exception on timeout is [withTimeoutOrNull].
7678 * Note, that timeout action can be specified for [select] invocation with [onTimeout][SelectBuilder.onTimeout] clause.
@@ -84,27 +86,40 @@ internal fun scheduledExecutorShutdownNowAndRelease() {
8486public suspend fun <T > withTimeout (time : Long , unit : TimeUnit = TimeUnit .MILLISECONDS , block : suspend () -> T ): T {
8587 require(time >= 0 ) { " Timeout time $time cannot be negative" }
8688 if (time <= 0L ) throw CancellationException (" Timed out immediately" )
87- return suspendCoroutineOrReturn sc@ { delegate: Continuation <T > ->
88- // schedule cancellation of this continuation on time
89- val cont = TimeoutExceptionContinuation (time, unit, delegate)
90- val delay = cont.context[ContinuationInterceptor ] as ? Delay
89+ return suspendCoroutineOrReturn { cont: Continuation <T > ->
90+ val context = cont.context
91+ val coroutine = TimeoutExceptionCoroutine (time, unit, cont)
92+ val delay = context[ContinuationInterceptor ] as ? Delay
93+ // schedule cancellation of this coroutine on time
9194 if (delay != null )
92- cont.disposeOnCompletion(delay.invokeOnTimeout(time, unit, cont)) else
93- cont.cancelFutureOnCompletion(scheduledExecutor.schedule(cont, time, unit))
94- // restart block using cancellable context of this continuation,
95+ coroutine.disposeOnCompletion(delay.invokeOnTimeout(time, unit, coroutine)) else
96+ coroutine.cancelFutureOnCompletion(scheduledExecutor.schedule(coroutine, time, unit))
97+ coroutine.initParentJob(context[Job ])
98+ // restart block using new coroutine with new job,
9599 // however start it as undispatched coroutine, because we are already in the proper context
96- block.startCoroutineUndispatched(cont)
97- cont.getResult()
100+ block.startCoroutineUninterceptedOrReturn(coroutine)
98101 }
99102}
100103
104+ private class TimeoutExceptionCoroutine <in T >(
105+ private val time : Long ,
106+ private val unit : TimeUnit ,
107+ private val cont : Continuation <T >
108+ ) : JobSupport(active = true ), Runnable, Continuation<T> {
109+ override val context: CoroutineContext = cont.context + this // mix in this Job into the context
110+ override fun run () { cancel(TimeoutException (time, unit)) }
111+ override fun resume (value : T ) { cont.resumeDirect(value) }
112+ override fun resumeWithException (exception : Throwable ) { cont.resumeDirectWithException(exception) }
113+ }
114+
101115/* *
102116 * Runs a given suspending block of code inside a coroutine with a specified timeout and returns
103117 * `null` if timeout was exceeded.
104118 *
105- * The code that is executing inside the [block] is cancelled on timeout and throws [CancellationException]
106- * exception inside of it. However, even the code in the block does not catch the cancellation exception,
107- * this `withTimeoutOrNull` function invocation still returns `null` on timeout.
119+ * The code that is executing inside the [block] is cancelled on timeout and the active or next invocation of
120+ * cancellable suspending function inside the block throws [CancellationException]. Normally that exception,
121+ * if uncaught by the block, gets converted into the `null` result of `withTimeoutOrNull`.
122+ * However, the code in the block can suppresses [CancellationException].
108123 *
109124 * The sibling function that throws exception on timeout is [withTimeout].
110125 * Note, that timeout action can be specified for [select] invocation with [onTimeout][SelectBuilder.onTimeout] clause.
@@ -118,33 +133,39 @@ public suspend fun <T> withTimeout(time: Long, unit: TimeUnit = TimeUnit.MILLISE
118133public suspend fun <T > withTimeoutOrNull (time : Long , unit : TimeUnit = TimeUnit .MILLISECONDS , block : suspend () -> T ): T ? {
119134 require(time >= 0 ) { " Timeout time $time cannot be negative" }
120135 if (time <= 0L ) return null
121- return suspendCoroutineOrReturn sc@ { delegate: Continuation <T ?> ->
122- // schedule cancellation of this continuation on time
123- val cont = TimeoutNullContinuation <T >(delegate)
124- val delay = cont.context[ContinuationInterceptor ] as ? Delay
136+ return suspendCoroutineOrReturn { cont: Continuation <T ?> ->
137+ val context = cont.context
138+ val coroutine = TimeoutNullCoroutine (time, unit, cont)
139+ val delay = context[ContinuationInterceptor ] as ? Delay
140+ // schedule cancellation of this coroutine on time
125141 if (delay != null )
126- cont.disposeOnCompletion(delay.invokeOnTimeout(time, unit, cont)) else
127- cont.cancelFutureOnCompletion(scheduledExecutor.schedule(cont, time, unit))
128- // restart block using cancellable context of this continuation,
142+ coroutine.disposeOnCompletion(delay.invokeOnTimeout(time, unit, coroutine)) else
143+ coroutine.cancelFutureOnCompletion(scheduledExecutor.schedule(coroutine, time, unit))
144+ coroutine.initParentJob(context[Job ])
145+ // restart block using new coroutine with new job,
129146 // however start it as undispatched coroutine, because we are already in the proper context
130- block.startCoroutineUndispatched(cont)
131- cont.getResult()
147+ try {
148+ block.startCoroutineUninterceptedOrReturn(coroutine)
149+ } catch (e: TimeoutException ) {
150+ null // replace inner timeout exception with null result
151+ }
132152 }
133153}
134154
135- private class TimeoutExceptionContinuation <in T >(
155+ private class TimeoutNullCoroutine <in T >(
136156 private val time : Long ,
137157 private val unit : TimeUnit ,
138- delegate : Continuation <T >
139- ) : CancellableContinuationImpl<T>(delegate, active = true ), Runnable {
140- override val defaultResumeMode get() = MODE_DIRECT
141- override fun run () { cancel(CancellationException (" Timed out waiting for $time $unit " )) }
158+ private val cont : Continuation <T ?>
159+ ) : JobSupport(active = true ), Runnable, Continuation<T> {
160+ override val context: CoroutineContext = cont.context + this // mix in this Job into the context
161+ override fun run () { cancel(TimeoutException (time, unit)) }
162+ override fun resume (value : T ) { cont.resumeDirect(value) }
163+ override fun resumeWithException (exception : Throwable ) {
164+ // suppress inner timeout exception and replace it with null
165+ if (exception is TimeoutException )
166+ cont.resumeDirect(null ) else
167+ cont.resumeDirectWithException(exception)
168+ }
142169}
143170
144- private class TimeoutNullContinuation <in T >(
145- delegate : Continuation <T ?>
146- ) : CancellableContinuationImpl<T?>(delegate, active = true ), Runnable {
147- override val defaultResumeMode get() = MODE_DIRECT
148- override val ignoreRepeatedResume: Boolean get() = true
149- override fun run () { resume(null , mode = 0 ) /* dispatch resume */ }
150- }
171+ private class TimeoutException (time : Long , unit : TimeUnit ) : CancellationException(" Timed out waiting for $time $unit " )
0 commit comments