@@ -14,20 +14,20 @@ import reactor.util.context.*
1414import kotlin.coroutines.*
1515
1616/* *
17- * Creates cold reactive [Flux] that runs a given [block] in a coroutine.
17+ * Creates a cold reactive [Flux] that runs the given [block] in a coroutine.
1818 * Every time the returned flux is subscribed, it starts a new coroutine in the specified [context].
19- * Coroutine emits ([Subscriber.onNext]) values with ` send` , completes ([Subscriber.onComplete])
20- * when the coroutine completes or channel is explicitly closed and emits error ([Subscriber.onError])
21- * if coroutine throws an exception or closes channel with a cause.
22- * Unsubscribing cancels running coroutine.
19+ * The coroutine emits ([Subscriber.onNext]) values with [ send][ProducerScope.send] , completes ([Subscriber.onComplete])
20+ * when the coroutine completes, or, in case the coroutine throws an exception or the channel is closed,
21+ * emits the error ([Subscriber.onError]) and closes the channel with the cause.
22+ * Unsubscribing cancels the running coroutine.
2323 *
24- * Invocations of `send` are suspended appropriately when subscribers apply back-pressure and to ensure that
25- * `onNext` is not invoked concurrently.
26- *
27- * Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance.
24+ * Invocations of [send][ProducerScope.send] are suspended appropriately when subscribers apply back-pressure and to
25+ * ensure that [onNext][Subscriber.onNext] is not invoked concurrently.
2826 *
2927 * **Note: This is an experimental api.** Behaviour of publishers that work as children in a parent scope with respect
3028 * to cancellation and error handling may change in the future.
29+ *
30+ * @throws IllegalArgumentException if the provided [context] contains a [Job] instance.
3131 */
3232@ExperimentalCoroutinesApi
3333public fun <T > flux (
@@ -43,12 +43,13 @@ private fun <T> reactorPublish(
4343 scope : CoroutineScope ,
4444 context : CoroutineContext = EmptyCoroutineContext ,
4545 @BuilderInference block : suspend ProducerScope <T >.() -> Unit
46- ): Publisher <T > = Publisher { subscriber ->
47- // specification requires NPE on null subscriber
48- if (subscriber == null ) throw NullPointerException (" Subscriber cannot be null" )
49- require(subscriber is CoreSubscriber ) { " Subscriber is not an instance of CoreSubscriber, context can not be extracted." }
46+ ): Publisher <T > = Publisher onSubscribe@{ subscriber: Subscriber <in T >? ->
47+ if (subscriber !is CoreSubscriber ) {
48+ subscriber.reject(IllegalArgumentException (" Subscriber is not an instance of CoreSubscriber, context can not be extracted." ))
49+ return @onSubscribe
50+ }
5051 val currentContext = subscriber.currentContext()
51- val reactorContext = ( context[ ReactorContext ]?.context?.putAll (currentContext) ? : currentContext).asCoroutineContext( )
52+ val reactorContext = context.extendReactorContext (currentContext)
5253 val newContext = scope.newCoroutineContext(context + reactorContext)
5354 val coroutine = PublisherCoroutine (newContext, subscriber, REACTOR_HANDLER )
5455 subscriber.onSubscribe(coroutine) // do it first (before starting coroutine), to avoid unnecessary suspensions
@@ -66,6 +67,23 @@ private val REACTOR_HANDLER: (Throwable, CoroutineContext) -> Unit = { cause, ct
6667 }
6768}
6869
70+ /* * The proper way to reject the subscriber, according to
71+ * [the reactive spec](https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md#1.9)
72+ */
73+ private fun <T > Subscriber<T>?.reject (t : Throwable ) {
74+ if (this == null )
75+ throw NullPointerException (" The subscriber can not be null" )
76+ onSubscribe(object : Subscription {
77+ override fun request (n : Long ) {
78+ // intentionally left blank
79+ }
80+ override fun cancel () {
81+ // intentionally left blank
82+ }
83+ })
84+ onError(t)
85+ }
86+
6987@Deprecated(
7088 message = " CoroutineScope.flux is deprecated in favour of top-level flux" ,
7189 level = DeprecationLevel .HIDDEN ,
0 commit comments