@@ -10,27 +10,48 @@ import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
1010import kotlinx.coroutines.intrinsics.*
1111import kotlin.coroutines.*
1212import kotlin.coroutines.intrinsics.*
13+ import kotlin.native.concurrent.*
1314
1415/* *
1516 * Broadcasts all elements of the channel.
17+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1618 *
1719 * The kind of the resulting channel depends on the specified [capacity] parameter:
1820 * when `capacity` is positive (1 by default), but less than [UNLIMITED] -- uses `ArrayBroadcastChannel` with a buffer of given capacity,
1921 * when `capacity` is [CONFLATED] -- uses [ConflatedBroadcastChannel] that conflates back-to-back sends;
2022 * Note that resulting channel behaves like [ConflatedBroadcastChannel] but is not an instance of [ConflatedBroadcastChannel].
2123 * otherwise -- throws [IllegalArgumentException].
2224 *
25+ * ### Cancelling broadcast
26+ *
27+ * **To stop broadcasting from the underlying channel call [cancel][BroadcastChannel.cancel] on the result.**
28+ *
29+ * Do not use [close][BroadcastChannel.close] on the resulting channel.
30+ * It causes eventual failure of the broadcast coroutine and cancellation of the underlying channel, too,
31+ * but it is not as prompt.
32+ *
33+ * ### Future replacement
34+ *
35+ * This function has an inappropriate result type of [BroadcastChannel] which provides
36+ * [send][BroadcastChannel.send] and [close][BroadcastChannel.close] operations that interfere with
37+ * the broadcasting coroutine in hard-to-specify ways. It will be replaced with
38+ * sharing operators on [Flow][kotlinx.coroutines.flow.Flow] in the future.
39+ *
2340 * @param start coroutine start option. The default value is [CoroutineStart.LAZY].
2441 */
2542fun <E > ReceiveChannel<E>.broadcast (
2643 capacity : Int = 1,
2744 start : CoroutineStart = CoroutineStart .LAZY
28- ): BroadcastChannel <E > =
29- GlobalScope .broadcast(Dispatchers .Unconfined , capacity = capacity, start = start, onCompletion = consumes()) {
45+ ): BroadcastChannel <E > {
46+ val scope = GlobalScope + Dispatchers .Unconfined + CoroutineExceptionHandler { _, _ -> }
47+ // We can run this coroutine in the context that ignores all exceptions, because of `onCompletion = consume()`
48+ // which passes all exceptions upstream to the source ReceiveChannel
49+ return scope.broadcast(capacity = capacity, start = start, onCompletion = consumes()) {
3050 for (e in this @broadcast) {
3151 send(e)
3252 }
3353 }
54+ }
3455
3556/* *
3657 * Launches new coroutine to produce a stream of values by sending them to a broadcast channel
@@ -63,6 +84,21 @@ fun <E> ReceiveChannel<E>.broadcast(
6384 *
6485 * See [newCoroutineContext] for a description of debugging facilities that are available for newly created coroutine.
6586 *
87+ * ### Cancelling broadcast
88+ *
89+ * **To stop broadcasting from the underlying channel call [cancel][BroadcastChannel.cancel] on the result.**
90+ *
91+ * Do not use [close][BroadcastChannel.close] on the resulting channel.
92+ * It causes failure of the `send` operation in broadcast coroutine and would not cancel it if the
93+ * coroutine is doing something else.
94+ *
95+ * ### Future replacement
96+ *
97+ * This function has an inappropriate result type of [BroadcastChannel] which provides
98+ * [send][BroadcastChannel.send] and [close][BroadcastChannel.close] operations that interfere with
99+ * the broadcasting coroutine in hard-to-specify ways. It will be replaced with
100+ * sharing operators on [Flow][kotlinx.coroutines.flow.Flow] in the future.
101+ *
66102 * @param context additional to [CoroutineScope.coroutineContext] context of the coroutine.
67103 * @param capacity capacity of the channel's buffer (1 by default).
68104 * @param start coroutine start option. The default value is [CoroutineStart.LAZY].
@@ -107,8 +143,9 @@ private open class BroadcastCoroutine<E>(
107143 }
108144
109145 override fun cancelInternal (cause : Throwable ) {
110- _channel .cancel(cause.toCancellationException()) // cancel the channel
111- cancelCoroutine(cause) // cancel the job
146+ val exception = cause.toCancellationException()
147+ _channel .cancel(exception) // cancel the channel
148+ cancelCoroutine(exception) // cancel the job
112149 }
113150
114151 override fun onCompleted (value : Unit ) {
@@ -119,6 +156,13 @@ private open class BroadcastCoroutine<E>(
119156 val processed = _channel .close(cause)
120157 if (! processed && ! handled) handleCoroutineException(context, cause)
121158 }
159+
160+ // The BroadcastChannel could be also closed
161+ override fun close (cause : Throwable ? ): Boolean {
162+ val result = _channel .close(cause)
163+ start() // start coroutine if it was not started yet
164+ return result
165+ }
122166}
123167
124168private class LazyBroadcastCoroutine <E >(
0 commit comments