Skip to content

Commit 313978c

Browse files
committed
Introduce deprecated ReceiveChannel#use to preserve backward compatibility with SubscriptionReceiveChannel, fix reactive samples
1 parent c22a1c7 commit 313978c

File tree

6 files changed

+44
-20
lines changed

6 files changed

+44
-20
lines changed

binary-compatibility-validator/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ sourceSets {
4343
}
4444

4545
test {
46+
dependsOn cleanCompileTestKotlin
4647
dependsOn configurations.testArtifacts
4748

4849
systemProperty 'testCasesClassesDirs', sourceSets.test.output.classesDirs.asPath

binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -581,6 +581,7 @@ public final class kotlinx/coroutines/experimental/channels/BroadcastChannel$Fac
581581

582582
public final class kotlinx/coroutines/experimental/channels/BroadcastChannelKt {
583583
public static final fun BroadcastChannel (I)Lkotlinx/coroutines/experimental/channels/BroadcastChannel;
584+
public static final synthetic fun use (Lkotlinx/coroutines/experimental/channels/ReceiveChannel;Lkotlin/jvm/functions/Function1;)V
584585
}
585586

586587
public abstract interface class kotlinx/coroutines/experimental/channels/Channel : kotlinx/coroutines/experimental/channels/ReceiveChannel, kotlinx/coroutines/experimental/channels/SendChannel {

common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/BroadcastChannel.kt

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,3 +97,18 @@ public interface SubscriptionReceiveChannel<out T> : ReceiveChannel<T>, Closeabl
9797
@Deprecated("Use `cancel`", replaceWith = ReplaceWith("cancel()"))
9898
public override fun close() { cancel() }
9999
}
100+
101+
/** @suppress **Deprecated**: Left here for migration from SubscriptionReceiveChannel */
102+
@Deprecated(level = DeprecationLevel.HIDDEN, message = "Use `ReceiveChannel<*>.consume` instead")
103+
public inline fun <E, R> ReceiveChannel<E>.use(block: (ReceiveChannel<E>) -> R) {
104+
var exception: Throwable? = null
105+
try {
106+
block(this)
107+
} catch (t: Throwable) {
108+
exception = t
109+
throw t
110+
}
111+
finally {
112+
this.cancel(exception)
113+
}
114+
}

reactive/coroutines-guide-reactive.md

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -237,8 +237,8 @@ as shown in the following example:
237237
<!--- INCLUDE
238238
import io.reactivex.*
239239
import kotlinx.coroutines.experimental.*
240+
import kotlinx.coroutines.experimental.channels.*
240241
import kotlinx.coroutines.experimental.reactive.*
241-
import kotlin.coroutines.experimental.*
242242
-->
243243

244244
```kotlin
@@ -247,8 +247,8 @@ fun main(args: Array<String>) = runBlocking<Unit> {
247247
.doOnSubscribe { println("OnSubscribe") } // provide some insight
248248
.doFinally { println("Finally") } // ... into what's going on
249249
var cnt = 0
250-
source.openSubscription().use { channel -> // open channel to the source
251-
for (x in channel) { // iterate over the channel to receive elements from it
250+
source.openSubscription().consume { // open channel to the source
251+
for (x in this) { // iterate over the channel to receive elements from it
252252
println(x)
253253
if (++cnt >= 3) break // break when 3 elements are printed
254254
}
@@ -677,6 +677,7 @@ We need to relay all the elements from the source stream until the other stream
677677
emits anything. However, we have [select] expression to rescue us in coroutines implementation:
678678

679679
<!--- INCLUDE
680+
import kotlinx.coroutines.experimental.channels.*
680681
import kotlinx.coroutines.experimental.*
681682
import kotlinx.coroutines.experimental.reactive.*
682683
import kotlinx.coroutines.experimental.selects.*
@@ -686,11 +687,13 @@ import kotlin.coroutines.experimental.*
686687

687688
```kotlin
688689
fun <T, U> Publisher<T>.takeUntil(context: CoroutineContext, other: Publisher<U>) = publish<T>(context) {
689-
this@takeUntil.openSubscription().use { thisChannel -> // explicitly open channel to Publisher<T>
690-
other.openSubscription().use { otherChannel -> // explicitly open channel to Publisher<U>
690+
this@takeUntil.openSubscription().consume { // explicitly open channel to Publisher<T>
691+
val current = this
692+
other.openSubscription().consume { // explicitly open channel to Publisher<U>
693+
val other = this
691694
whileSelect {
692-
otherChannel.onReceive { false } // bail out on any received element from `other`
693-
thisChannel.onReceive { send(it); true } // resend element from this channel and continue
695+
other.onReceive { false } // bail out on any received element from `other`
696+
current.onReceive { send(it); true } // resend element from this channel and continue
694697
}
695698
}
696699
}

reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-03.kt

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,18 +19,19 @@ package guide.reactive.basic.example03
1919

2020
import io.reactivex.*
2121
import kotlinx.coroutines.experimental.*
22+
import kotlinx.coroutines.experimental.channels.*
2223
import kotlinx.coroutines.experimental.reactive.*
23-
import kotlin.coroutines.experimental.*
2424

2525
fun main(args: Array<String>) = runBlocking<Unit> {
2626
val source = Flowable.range(1, 5) // a range of five numbers
2727
.doOnSubscribe { println("OnSubscribe") } // provide some insight
2828
.doFinally { println("Finally") } // ... into what's going on
2929
var cnt = 0
30-
val channel = source.openSubscription() // open channel to the source
31-
for (x in channel) { // iterate over the channel to receive elements from it
32-
println(x)
33-
if (++cnt >= 3) break // break when 3 elements are printed
30+
source.openSubscription().consume { // open channel to the source
31+
for (x in this) { // iterate over the channel to receive elements from it
32+
println(x)
33+
if (++cnt >= 3) break // break when 3 elements are printed
34+
}
35+
// `use` will close the channel when this block of code is complete
3436
}
35-
channel.cancel() // `cancel` closes the channel
3637
}

reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-operators-03.kt

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,21 +17,24 @@
1717
// This file was automatically generated from coroutines-guide-reactive.md by Knit tool. Do not edit.
1818
package guide.reactive.operators.example03
1919

20+
import kotlinx.coroutines.experimental.channels.*
2021
import kotlinx.coroutines.experimental.*
2122
import kotlinx.coroutines.experimental.reactive.*
2223
import kotlinx.coroutines.experimental.selects.*
2324
import org.reactivestreams.*
2425
import kotlin.coroutines.experimental.*
2526

2627
fun <T, U> Publisher<T>.takeUntil(context: CoroutineContext, other: Publisher<U>) = publish<T>(context) {
27-
val thisChannel = this@takeUntil.openSubscription() // explicitly open channel to Publisher<T>
28-
val otherChannel = other.openSubscription() // explicitly open channel to Publisher<U>
29-
whileSelect {
30-
otherChannel.onReceive { false } // bail out on any received element from `other`
31-
thisChannel.onReceive { send(it); true } // resend element from this channel and continue
28+
this@takeUntil.openSubscription().consume { // explicitly open channel to Publisher<T>
29+
val current = this
30+
other.openSubscription().consume { // explicitly open channel to Publisher<U>
31+
val other = this
32+
whileSelect {
33+
other.onReceive { false } // bail out on any received element from `other`
34+
current.onReceive { send(it); true } // resend element from this channel and continue
35+
}
36+
}
3237
}
33-
thisChannel.cancel()
34-
otherChannel.cancel()
3538
}
3639

3740
fun rangeWithInterval(context: CoroutineContext, time: Long, start: Int, count: Int) = publish<Int>(context) {

0 commit comments

Comments
 (0)