@@ -59,6 +59,9 @@ private class PublisherCoroutine<in T>(
5959 private val subscriber : Subscriber <T >
6060) : AbstractCoroutine<Unit>(parentContext, true ), ProducerScope<T>, Subscription, SelectClause2<T, SendChannel<T>> {
6161 override val channel: SendChannel <T > get() = this
62+
63+ // cancelsParent == true ensure that error is always reported to the parent, so that parent cannot complete
64+ // without receiving reported error.
6265 override val cancelsParent: Boolean get() = true
6366
6467 // Mutex is locked when either nRequested == 0 or while subscriber.onXXX is being invoked
@@ -69,6 +72,8 @@ private class PublisherCoroutine<in T>(
6972 @Volatile
7073 private var cancelled = false // true when Subscription.cancel() is invoked
7174
75+ private var handleException = false // when handleJobException is invoked
76+
7277 override val isClosedForSend: Boolean get() = isCompleted
7378 override val isFull: Boolean = mutex.isLocked
7479 override fun close (cause : Throwable ? ): Boolean = cancel(cause)
@@ -105,68 +110,79 @@ private class PublisherCoroutine<in T>(
105110 }
106111 }
107112
113+ /*
114+ This code is not trivial because of the two properties:
115+ 1. It ensures conformance to the reactive specification that mandates that onXXX invocations should not
116+ be concurrent. It uses Mutex to protect all onXXX invocation and ensure conformance even when multiple
117+ coroutines are invoking `send` function.
118+ 2. Normally, `onComplete/onError` notification is sent only when coroutine and all its children are complete.
119+ However, nothing prevents `publish` coroutine from leaking reference to it send channel to some
120+ globally-scoped coroutine that is invoking `send` outside of this context. Without extra precaution this may
121+ lead to `onNext` that is concurrent with `onComplete/onError`, so that is why signalling for
122+ `onComplete/onError` is also done under the same mutex.
123+ */
124+
108125 // assert: mutex.isLocked()
109126 private fun doLockedNext (elem : T ) {
110- // check if already closed for send
127+ // check if already closed for send, note, that isActive become false as soon as cancel() is invoked,
128+ // because the job is cancelled, so this check also ensure conformance to the reactive specification's
129+ // requirement that after cancellation requested we don't call onXXX
111130 if (! isActive) {
112- doLockedSignalCompleted ()
131+ unlockAndCheckCompleted ()
113132 throw getCancellationException()
114133 }
115134 // notify subscriber
116- try {
117- subscriber.onNext(elem)
118- } catch (e: Throwable ) {
119- try {
120- if (! cancel(e))
121- handleCoroutineException(context, e, this )
122- } finally {
123- doLockedSignalCompleted()
124- }
125- throw getCancellationException()
126- }
135+ subscriber.onNext(elem)
127136 // now update nRequested
128137 while (true ) { // lock-free loop on nRequested
129138 val cur = _nRequested .value
130139 if (cur < 0 ) break // closed from inside onNext => unlock
131140 if (cur == Long .MAX_VALUE ) break // no back-pressure => unlock
132141 val upd = cur - 1
133142 if (_nRequested .compareAndSet(cur, upd)) {
134- if (upd == 0L ) return // return to keep locked due to back-pressure
143+ if (upd == 0L ) {
144+ // return to keep locked due to back-pressure
145+ return
146+ }
135147 break // unlock if upd > 0
136148 }
137149 }
138- /*
139- There is no sense to check for `isActive` before doing `unlock`, because cancellation/completion might
140- happen after this check and before `unlock` (see `onCancellation` that does not do anything
141- if it fails to acquire the lock that we are still holding).
142- We have to recheck `isActive` after `unlock` anyway.
143- */
150+ unlockAndCheckCompleted()
151+ }
152+
153+ private fun unlockAndCheckCompleted () {
154+ /*
155+ There is no sense to check completion before doing `unlock`, because completion might
156+ happen after this check and before `unlock` (see `signalCompleted` that does not do anything
157+ if it fails to acquire the lock that we are still holding).
158+ We have to recheck `isCompleted` after `unlock` anyway.
159+ */
144160 mutex.unlock()
145- // recheck isActive
146- if (! isActive && mutex.tryLock())
147- doLockedSignalCompleted()
161+ // check isCompleted and and try to regain lock to signal completion
162+ if (isCompleted && mutex.tryLock()) doLockedSignalCompleted()
148163 }
149164
150- // assert: mutex.isLocked()
165+ // assert: mutex.isLocked() & isCompleted
151166 private fun doLockedSignalCompleted () {
152167 try {
153168 if (_nRequested .value >= CLOSED ) {
154169 _nRequested .value = SIGNALLED // we'll signal onError/onCompleted (that the final state -- no CAS needed)
155170 val cause = getCompletionCause()
156171 // Specification requires that after cancellation requested we don't call onXXX
157172 if (cancelled) {
158- // but we cannot just ignore exception so we handle it
159- if (cause != null ) handleCoroutineException(context, cause, this )
160- return
161- }
162- try {
163- if (cause != null && cause !is CancellationException )
164- subscriber.onError(cause)
165- else
166- subscriber.onComplete()
167- } catch (e: Throwable ) {
168- handleCoroutineException(context, e, this )
169- }
173+ // If the parent had failed to handle our exception (handleJobException was invoked), then
174+ // we must not loose this exception
175+ if (handleException && cause != null ) handleExceptionViaHandler(parentContext, cause)
176+ } else {
177+ try {
178+ if (cause != null && cause !is CancellationException )
179+ subscriber.onError(cause)
180+ else
181+ subscriber.onComplete()
182+ } catch (e: Throwable ) {
183+ handleExceptionViaHandler(parentContext, e)
184+ }
185+ }
170186 }
171187 } finally {
172188 mutex.unlock()
@@ -189,37 +205,48 @@ private class PublisherCoroutine<in T>(
189205 if (_nRequested .compareAndSet(cur, upd)) {
190206 // unlock the mutex when we don't have back-pressure anymore
191207 if (cur == 0L ) {
192- mutex.unlock()
193- // recheck isActive
194- if (! isActive && mutex.tryLock())
195- doLockedSignalCompleted()
208+ unlockAndCheckCompleted()
196209 }
197210 return
198211 }
199212 }
200213 }
201214
202- override fun onCompletedExceptionally (exception : Throwable ) = onCompleted(Unit )
203-
204- override fun onCompleted (value : Unit ) {
215+ // assert: isCompleted
216+ private fun signalCompleted () {
205217 while (true ) { // lock-free loop for nRequested
206218 val cur = _nRequested .value
207219 if (cur == SIGNALLED ) return // some other thread holding lock already signalled cancellation/completion
208- check(cur >= 0 ) // no other thread could have marked it as CLOSED, because onCancellation is invoked once
220+ check(cur >= 0 ) // no other thread could have marked it as CLOSED, because onCompleted[Exceptionally] is invoked once
209221 if (! _nRequested .compareAndSet(cur, CLOSED )) continue // retry on failed CAS
210222 // Ok -- marked as CLOSED, now can unlock the mutex if it was locked due to backpressure
211223 if (cur == 0L ) {
212224 doLockedSignalCompleted()
213225 } else {
214226 // otherwise mutex was either not locked or locked in concurrent onNext... try lock it to signal completion
215- if (mutex.tryLock())
216- doLockedSignalCompleted()
227+ if (mutex.tryLock()) doLockedSignalCompleted()
217228 // Note: if failed `tryLock`, then `doLockedNext` will signal after performing `unlock`
218229 }
219230 return // done anyway
220231 }
221232 }
222233
234+ // Note: It is invoked when parent fails to handle an exception and strictly before onCompleted[Exception]
235+ // so here we just raise a flag (and it need NOT be volatile!) to handle this exception.
236+ // This way we defer decision to handle this exception based on our ability to send this exception
237+ // to the subscriber (see doLockedSignalCompleted)
238+ override fun handleJobException (exception : Throwable ) {
239+ handleException = true
240+ }
241+
242+ override fun onCompletedExceptionally (exception : Throwable ) {
243+ signalCompleted()
244+ }
245+
246+ override fun onCompleted (value : Unit ) {
247+ signalCompleted()
248+ }
249+
223250 override fun cancel () {
224251 // Specification requires that after cancellation publisher stops signalling
225252 // This flag distinguishes subscription cancellation request from the job crash
0 commit comments