File tree Expand file tree Collapse file tree 6 files changed +107
-8
lines changed
common/kotlinx-coroutines-core-common
kotlinx-coroutines-reactive/test
kotlinx-coroutines-rx2/test Expand file tree Collapse file tree 6 files changed +107
-8
lines changed Original file line number Diff line number Diff line change @@ -277,12 +277,16 @@ internal abstract class AbstractContinuation<in T>(
277277 }
278278 is CancelledContinuation -> {
279279 /*
280- * If continuation was cancelled, then all further updates ( resumes or exceptions) must be
281- * ignored, because cancellation is asynchronous and may race with resume/resumeWithException .
282- * This race is normal.
280+ * If continuation was cancelled, then all further resumes must be
281+ * ignored, because cancellation is asynchronous and may race with resume.
282+ * Racy exception are reported so no exceptions are lost
283283 *
284284 * :todo: we should somehow remember the attempt to invoke resume and fail on the second attempt.
285285 */
286+ if (proposedUpdate is CompletedExceptionally ) {
287+ handleException(proposedUpdate.cause)
288+ }
289+
286290 return
287291 }
288292 else -> error(" Already resumed, but proposed with update $proposedUpdate " )
Original file line number Diff line number Diff line change 11/*
22 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
33 */
4+ @file:Suppress(" NAMED_ARGUMENTS_NOT_ALLOWED" ) // KT-21913
45
56package kotlinx.coroutines.experimental
67
@@ -71,7 +72,7 @@ class CancellableContinuationTest : TestBase() {
7172 * should be ignored. Here suspended coroutine is cancelled but then resumed with exception.
7273 */
7374 @Test
74- fun testCancelAndResumeWithException () = runTest {
75+ fun testCancelAndResumeWithException () = runTest(unhandled = listOf ({e -> e is TestException })) {
7576 var continuation: Continuation <Unit >? = null
7677 val job = launch(coroutineContext) {
7778 try {
Original file line number Diff line number Diff line change @@ -32,7 +32,7 @@ class IntegrationTest(
3232 @JvmStatic
3333 fun params (): Collection <Array <Any >> = Ctx .values().flatMap { ctx ->
3434 listOf (false , true ).map { delay ->
35- arrayOf< Any > (ctx, delay)
35+ arrayOf(ctx, delay)
3636 }
3737 }
3838 }
@@ -95,6 +95,34 @@ class IntegrationTest(
9595 channel.cancel()
9696 }
9797
98+ @Test
99+ fun testCancelWithoutValue () = runTest {
100+ val job = launch(coroutineContext, parent = Job (), start = CoroutineStart .UNDISPATCHED ) {
101+ publish<String >(coroutineContext) {
102+ yield ()
103+ expectUnreached()
104+ }.awaitFirst()
105+ }
106+
107+ job.cancel()
108+ job.join()
109+ }
110+
111+ @Test
112+ fun testEmptySingle () = runTest(unhandled = listOf ({e -> e is NoSuchElementException })) {
113+ expect(1 )
114+ val job = launch(coroutineContext, parent = Job (), start = CoroutineStart .UNDISPATCHED ) {
115+ publish<String >(coroutineContext) {
116+ yield ()
117+ expect(2 )
118+ // Nothing to emit
119+ }.awaitFirst()
120+ }
121+
122+ job.join()
123+ finish(3 )
124+ }
125+
98126 private suspend fun checkNumbers (n : Int , pub : Publisher <Int >) {
99127 var last = 0
100128 pub.consumeEach {
Original file line number Diff line number Diff line change @@ -120,7 +120,17 @@ private suspend fun <T> Observable<T>.awaitOne(): T = suspendCancellableCoroutin
120120 override fun onStart () { request(1 ) }
121121 override fun onNext (t : T ) { cont.resume(t) }
122122 override fun onCompleted () { if (cont.isActive) cont.resumeWithException(IllegalStateException (" Should have invoked onNext" )) }
123- override fun onError (e : Throwable ) { cont.resumeWithException(e) }
123+ override fun onError (e : Throwable ) {
124+ /*
125+ * Rx1 observable throws NoSuchElementException if cancellation happened before
126+ * element emission. To mitigate this we try to atomically resume continuation with exception:
127+ * if resume failed, then we know that continuation successfully cancelled itself
128+ */
129+ val token = cont.tryResumeWithException(e)
130+ if (token != null ) {
131+ cont.completeResume(token)
132+ }
133+ }
124134 }))
125135}
126136
Original file line number Diff line number Diff line change @@ -32,7 +32,7 @@ class IntegrationTest(
3232 @JvmStatic
3333 fun params (): Collection <Array <Any >> = Ctx .values().flatMap { ctx ->
3434 listOf (false , true ).map { delay ->
35- arrayOf< Any > (ctx, delay)
35+ arrayOf(ctx, delay)
3636 }
3737 }
3838 }
@@ -117,6 +117,34 @@ class IntegrationTest(
117117 channel.cancel()
118118 }
119119
120+ @Test
121+ fun testCancelWithoutValue () = runTest {
122+ val job = launch(coroutineContext, parent = Job (), start = CoroutineStart .UNDISPATCHED ) {
123+ rxObservable<String >(coroutineContext) {
124+ yield ()
125+ expectUnreached()
126+ }.awaitFirst()
127+ }
128+
129+ job.cancel()
130+ job.join()
131+ }
132+
133+ @Test
134+ fun testEmptySingle () = runTest(unhandled = listOf ({e -> e is NoSuchElementException })) {
135+ expect(1 )
136+ val job = launch(coroutineContext, parent = Job (), start = CoroutineStart .UNDISPATCHED ) {
137+ rxObservable<String >(coroutineContext) {
138+ yield ()
139+ expect(2 )
140+ // Nothing to emit
141+ }.awaitFirst()
142+ }
143+
144+ job.join()
145+ finish(3 )
146+ }
147+
120148 private suspend fun checkNumbers (n : Int , observable : Observable <Int >) {
121149 var last = 0
122150 observable.consumeEach {
Original file line number Diff line number Diff line change @@ -32,7 +32,7 @@ class IntegrationTest(
3232 @JvmStatic
3333 fun params (): Collection <Array <Any >> = Ctx .values().flatMap { ctx ->
3434 listOf (false , true ).map { delay ->
35- arrayOf< Any > (ctx, delay)
35+ arrayOf(ctx, delay)
3636 }
3737 }
3838 }
@@ -97,6 +97,34 @@ class IntegrationTest(
9797 channel.cancel()
9898 }
9999
100+ @Test
101+ fun testCancelWithoutValue () = runTest {
102+ val job = launch(coroutineContext, parent = Job (), start = CoroutineStart .UNDISPATCHED ) {
103+ rxObservable<String >(coroutineContext) {
104+ yield ()
105+ expectUnreached()
106+ }.awaitFirst()
107+ }
108+
109+ job.cancel()
110+ job.join()
111+ }
112+
113+ @Test
114+ fun testEmptySingle () = runTest(unhandled = listOf ({e -> e is NoSuchElementException })) {
115+ expect(1 )
116+ val job = launch(coroutineContext, parent = Job (), start = CoroutineStart .UNDISPATCHED ) {
117+ rxObservable<String >(coroutineContext) {
118+ yield ()
119+ expect(2 )
120+ // Nothing to emit
121+ }.awaitFirst()
122+ }
123+
124+ job.join()
125+ finish(3 )
126+ }
127+
100128 private suspend fun checkNumbers (n : Int , observable : Observable <Int >) {
101129 var last = 0
102130 observable.consumeEach {
You can’t perform that action at this time.
0 commit comments