@@ -131,15 +131,19 @@ private class RxObservableCoroutine<T: Any>(
131131 // to abort the corresponding send/offer invocation. From the standpoint of coroutines machinery,
132132 // this failure is essentially equivalent to a failure of a child coroutine.
133133 cancelCoroutine(e)
134- doLockedSignalCompleted(e, false )
134+ mutex.unlock( )
135135 throw e
136136 }
137137 /*
138- There is no sense to check for `isActive` before doing `unlock`, because cancellation/completion might
139- happen after this check and before `unlock` (see signalCompleted that does not do anything
140- if it fails to acquire the lock that we are still holding).
141- We have to recheck `isCompleted` after `unlock` anyway.
138+ * There is no sense to check for `isActive` before doing `unlock`, because cancellation/completion might
139+ * happen after this check and before `unlock` (see signalCompleted that does not do anything
140+ * if it fails to acquire the lock that we are still holding).
141+ * We have to recheck `isCompleted` after `unlock` anyway.
142142 */
143+ unlockAndCheckCompleted()
144+ }
145+
146+ private fun unlockAndCheckCompleted () {
143147 mutex.unlock()
144148 // recheck isActive
145149 if (! isActive && mutex.tryLock())
@@ -148,16 +152,32 @@ private class RxObservableCoroutine<T: Any>(
148152
149153 // assert: mutex.isLocked()
150154 private fun doLockedSignalCompleted (cause : Throwable ? , handled : Boolean ) {
151- // todo: handled is ignored here, might need something like in PublisherCoroutine to process
152155 // cancellation failures
153156 try {
154157 if (_signal .value >= CLOSED ) {
155158 _signal .value = SIGNALLED // we'll signal onError/onCompleted (that the final state -- no CAS needed)
156159 try {
157- if (cause != null && cause !is CancellationException )
158- subscriber.onError(cause)
159- else
160+ if (cause != null && cause !is CancellationException ) {
161+ /*
162+ * Reactive frameworks have two types of exceptions: regular and fatal.
163+ * Regular are passed to onError.
164+ * Fatal can be passed to onError, but implementation **is free to swallow it** (e.g. see #1297).
165+ * Such behaviour is inconsistent, leads to silent failures and we can't possibly know whether
166+ * the cause will be handled by onError (and moreover, it depends on whether a fatal exception was
167+ * thrown by subscriber or upstream).
168+ * To make behaviour consistent and least surprising, we always handle fatal exceptions
169+ * by coroutines machinery, anyway, they should not be present in regular program flow,
170+ * thus our goal here is just to expose it as soon as possible.
171+ */
172+ if (cause.isFatal()) {
173+ if (! handled) handleCoroutineException(context, cause)
174+ } else {
175+ subscriber.onError(cause)
176+ }
177+ }
178+ else {
160179 subscriber.onComplete()
180+ }
161181 } catch (e: Throwable ) {
162182 // Unhandled exception (cannot handle in other way, since we are already complete)
163183 handleCoroutineException(context, e)
@@ -181,4 +201,6 @@ private class RxObservableCoroutine<T: Any>(
181201 override fun onCancelled (cause : Throwable , handled : Boolean ) {
182202 signalCompleted(cause, handled)
183203 }
184- }
204+ }
205+
206+ internal fun Throwable.isFatal () = this is VirtualMachineError || this is ThreadDeath || this is LinkageError
0 commit comments