File tree Expand file tree Collapse file tree 3 files changed +9
-5
lines changed
kotlinx-coroutines-reactive/src/main/kotlin/kotlinx/coroutines/experimental/reactive
kotlinx-coroutines-reactor/src/main/kotlin/kotlinx/coroutines/experimental/reactor Expand file tree Collapse file tree 3 files changed +9
-5
lines changed Original file line number Diff line number Diff line change 1616
1717package kotlinx.coroutines.experimental.reactive
1818
19+ import kotlinx.coroutines.experimental.DefaultDispatcher
1920import kotlinx.coroutines.experimental.channels.ReceiveChannel
2021import org.reactivestreams.Publisher
2122import kotlin.coroutines.experimental.CoroutineContext
@@ -28,7 +29,7 @@ import kotlin.coroutines.experimental.CoroutineContext
2829 *
2930 * @param context -- the coroutine context from which the resulting observable is going to be signalled
3031 */
31- public fun <T > ReceiveChannel<T>.asPublisher (context : CoroutineContext ): Publisher <T > = publish(context) {
32+ public fun <T > ReceiveChannel<T>.asPublisher (context : CoroutineContext = DefaultDispatcher ): Publisher <T > = publish(context) {
3233 for (t in this @asPublisher)
3334 send(t)
3435}
Original file line number Diff line number Diff line change 11package kotlinx.coroutines.experimental.reactor
22
3+ import kotlinx.coroutines.experimental.DefaultDispatcher
34import kotlinx.coroutines.experimental.Deferred
45import kotlinx.coroutines.experimental.Job
56import kotlinx.coroutines.experimental.channels.ReceiveChannel
@@ -16,7 +17,7 @@ import kotlin.coroutines.experimental.CoroutineContext
1617 *
1718 * @param context -- the coroutine context from which the resulting mono is going to be signalled
1819 */
19- public fun Job.asMono (context : CoroutineContext ): Mono <Unit > = mono(context) { this @asMono.join() }
20+ public fun Job.asMono (context : CoroutineContext = DefaultDispatcher ): Mono <Unit > = mono(context) { this @asMono.join() }
2021
2122/* *
2223 * Converts this deferred value to the hot reactive mono that signals
@@ -27,7 +28,7 @@ public fun Job.asMono(context: CoroutineContext): Mono<Unit> = mono(context) { t
2728 *
2829 * @param context -- the coroutine context from which the resulting mono is going to be signalled
2930 */
30- public fun <T > Deferred<T?>.asMono (context : CoroutineContext ): Mono <T > = mono(context) { this @asMono.await() }
31+ public fun <T > Deferred<T?>.asMono (context : CoroutineContext = DefaultDispatcher ): Mono <T > = mono(context) { this @asMono.await() }
3132
3233/* *
3334 * Converts a stream of elements received from the channel to the hot reactive flux.
@@ -37,7 +38,7 @@ public fun <T> Deferred<T?>.asMono(context: CoroutineContext): Mono<T> = mono(co
3738 *
3839 * @param context -- the coroutine context from which the resulting flux is going to be signalled
3940 */
40- public fun <T > ReceiveChannel<T>.asFlux (context : CoroutineContext ): Flux <T > = flux(context) {
41+ public fun <T > ReceiveChannel<T>.asFlux (context : CoroutineContext = DefaultDispatcher ): Flux <T > = flux(context) {
4142 for (t in this @asFlux)
4243 send(t)
4344}
Original file line number Diff line number Diff line change 11package kotlinx.coroutines.experimental.reactor
22
3+ import kotlinx.coroutines.experimental.DefaultDispatcher
34import kotlinx.coroutines.experimental.channels.ProducerScope
45import kotlinx.coroutines.experimental.reactive.publish
56import reactor.core.publisher.Flux
@@ -19,7 +20,8 @@ import kotlin.coroutines.experimental.CoroutineContext
1920 * | Normal completion or `close` without cause | `onComplete`
2021 * | Failure with exception or `close` with cause | `onError`
2122 */
23+ @JvmOverloads // for binary compatibility with older code compiled before context had a default
2224fun <T > flux (
23- context : CoroutineContext ,
25+ context : CoroutineContext = DefaultDispatcher ,
2426 block : suspend ProducerScope <T >.() -> Unit
2527): Flux <T > = Flux .from(publish(context, block))
You can’t perform that action at this time.
0 commit comments