File tree Expand file tree Collapse file tree 5 files changed +51
-7
lines changed
kotlinx-coroutines-core/common Expand file tree Collapse file tree 5 files changed +51
-7
lines changed Original file line number Diff line number Diff line change 11/*
2- * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
2+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
33 */
44
55package kotlinx.coroutines.channels
@@ -642,7 +642,13 @@ internal abstract class AbstractChannel<E>(
642642 cancelInternal(cause)
643643
644644 final override fun cancel (cause : CancellationException ? ) {
645- if (isClosedForReceive) return // Do not create an exception if channel is already cancelled
645+ /*
646+ * Do not create an exception if channel is already cancelled.
647+ * Channel is closed for receive when either it is cancelled (then we are free to bail out)
648+ * or was closed and elements were received.
649+ * Then `onCancelIdempotent` does nothing for all implementations.
650+ */
651+ if (isClosedForReceive) return
646652 cancelInternal(cause ? : CancellationException (" $classSimpleName was cancelled" ))
647653 }
648654
Original file line number Diff line number Diff line change 11/*
2- * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
2+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
33 */
44
55package kotlinx.coroutines.channels
@@ -26,7 +26,7 @@ internal open class ChannelCoroutine<E>(
2626 }
2727
2828 final override fun cancel (cause : CancellationException ? ) {
29- if (isClosedForReceive ) return // Do not create an exception if channel is already cancelled
29+ if (isCancelled ) return // Do not create an exception if the coroutine (-> the channel) is already cancelled
3030 cancelInternal(cause ? : defaultCancellationException())
3131 }
3232
Original file line number Diff line number Diff line change 1+ /*
2+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+ */
4+
15package kotlinx.coroutines.channels
26
37import kotlinx.atomicfu.*
@@ -115,4 +119,4 @@ class ChannelUndeliveredElementTest : TestBase() {
115119 check(! _cancelled .getAndSet(true )) { " Already cancelled" }
116120 }
117121 }
118- }
122+ }
Original file line number Diff line number Diff line change 11/*
2- * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
2+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
33 */
44
55package kotlinx.coroutines.channels
@@ -95,6 +95,27 @@ class ProduceTest : TestBase() {
9595 cancelOnCompletion(coroutineContext)
9696 }
9797
98+ @Test
99+ fun testCancelWhenTheChannelIsClosed () = runTest {
100+ val channel = produce<Int > {
101+ send(1 )
102+ close()
103+ expect(2 )
104+ launch {
105+ expect(3 )
106+ hang { expect(5 ) }
107+ }
108+ }
109+
110+ expect(1 )
111+ channel.receive()
112+ yield ()
113+ expect(4 )
114+ channel.cancel()
115+ (channel as Job ).join()
116+ finish(6 )
117+ }
118+
98119 @Test
99120 fun testAwaitConsumerCancellation () = runTest {
100121 val parent = Job ()
Original file line number Diff line number Diff line change 11/*
2- * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
2+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
33 */
44
55package kotlinx.coroutines.flow
@@ -194,4 +194,17 @@ class ChannelFlowTest : TestBase() {
194194 assertEquals(listOf (1 ), flow.toList())
195195 finish(3 )
196196 }
197+
198+ @Test
199+ fun testCancelledOnCompletion () = runTest {
200+ val myFlow = callbackFlow<Any > {
201+ expect(2 )
202+ close()
203+ hang { expect(3 ) }
204+ }
205+
206+ expect(1 )
207+ myFlow.collect()
208+ finish(4 )
209+ }
197210}
You can’t perform that action at this time.
0 commit comments