File tree Expand file tree Collapse file tree 4 files changed +36
-4
lines changed
kotlinx-coroutines-core/common Expand file tree Collapse file tree 4 files changed +36
-4
lines changed Original file line number Diff line number Diff line change @@ -30,6 +30,7 @@ public suspend fun <T> FlowCollector<T>.emitAll(channel: ReceiveChannel<T>): Uni
3030 emitAllImpl(channel, consume = true )
3131
3232private suspend fun <T > FlowCollector<T>.emitAllImpl (channel : ReceiveChannel <T >, consume : Boolean ) {
33+ ensureActive()
3334 // Manually inlined "consumeEach" implementation that does not use iterator but works via "receiveCatching".
3435 // It has smaller and more efficient spilled state which also allows to implement a manual kludge to
3536 // fix retention of the last emitted value.
Original file line number Diff line number Diff line change @@ -194,7 +194,15 @@ public fun <T> Flow<T>.onEmpty(
194194 }
195195}
196196
197- private class ThrowingCollector (private val e : Throwable ) : FlowCollector<Any?> {
197+ /*
198+ * 'emitAll' methods call this to fail-fast before starting to collect
199+ * their sources (that may not have any elements for a long time).
200+ */
201+ internal fun FlowCollector <* >.ensureActive () {
202+ if (this is ThrowingCollector ) throw e
203+ }
204+
205+ internal class ThrowingCollector (@JvmField val e : Throwable ) : FlowCollector<Any?> {
198206 override suspend fun emit (value : Any? ) {
199207 throw e
200208 }
Original file line number Diff line number Diff line change @@ -127,5 +127,7 @@ public suspend fun <T> Flow<T>.collectLatest(action: suspend (value: T) -> Unit)
127127 * Collects all the values from the given [flow] and emits them to the collector.
128128 * It is a shorthand for `flow.collect { value -> emit(value) }`.
129129 */
130- @BuilderInference
131- public suspend inline fun <T > FlowCollector<T>.emitAll (flow : Flow <T >): Unit = flow.collect(this )
130+ public suspend fun <T > FlowCollector<T>.emitAll (flow : Flow <T >) {
131+ ensureActive()
132+ flow.collect(this )
133+ }
Original file line number Diff line number Diff line change 11/*
2- * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
2+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
33 */
44
55package kotlinx.coroutines.flow
66
77import kotlinx.coroutines.*
8+ import kotlinx.coroutines.channels.*
89import kotlinx.coroutines.flow.internal.*
910import kotlin.test.*
1011
@@ -290,4 +291,24 @@ class OnCompletionTest : TestBase() {
290291 val expected = (1 .. 5 ).toList() + (- 1 )
291292 assertEquals(expected, result)
292293 }
294+
295+ @Test
296+ fun testCancelledEmitAllFlow () = runTest {
297+ // emitAll does not call 'collect' on onCompletion collector
298+ // if the target flow is empty
299+ flowOf(1 , 2 , 3 )
300+ .onCompletion { emitAll(MutableSharedFlow ()) }
301+ .take(1 )
302+ .collect()
303+ }
304+
305+ @Test
306+ fun testCancelledEmitAllChannel () = runTest {
307+ // emitAll does not call 'collect' on onCompletion collector
308+ // if the target channel is empty
309+ flowOf(1 , 2 , 3 )
310+ .onCompletion { emitAll(Channel ()) }
311+ .take(1 )
312+ .collect()
313+ }
293314}
You can’t perform that action at this time.
0 commit comments