@@ -19,7 +19,7 @@ package kotlinx.coroutines.experimental.channels
1919import kotlinx.coroutines.experimental.CancellableContinuation
2020import kotlinx.coroutines.experimental.internal.LockFreeLinkedListHead
2121import kotlinx.coroutines.experimental.internal.LockFreeLinkedListNode
22- import kotlinx.coroutines.experimental.removeOnCompletion
22+ import kotlinx.coroutines.experimental.removeOnCancel
2323import kotlinx.coroutines.experimental.suspendCancellableCoroutine
2424
2525/* *
@@ -72,17 +72,17 @@ public abstract class AbstractChannel<E> : Channel<E> {
7272
7373 // ------ SendChannel ------
7474
75- override val isClosedForSend: Boolean get() = closedForSend != null
76- override val isFull: Boolean get() = queue.next() !is ReceiveOrClosed <* > && isBufferFull
75+ public final override val isClosedForSend: Boolean get() = closedForSend != null
76+ public final override val isFull: Boolean get() = queue.next() !is ReceiveOrClosed <* > && isBufferFull
7777
78- suspend override fun send (element : E ) {
78+ public final override suspend fun send (element : E ) {
7979 // fast path -- try offer non-blocking
8080 if (offer(element)) return
8181 // slow-path does suspend
8282 return sendSuspend(element)
8383 }
8484
85- override fun offer (element : E ): Boolean {
85+ public final override fun offer (element : E ): Boolean {
8686 val result = offerInternal(element)
8787 return when {
8888 result == = OFFER_SUCCESS -> true
@@ -96,7 +96,7 @@ public abstract class AbstractChannel<E> : Channel<E> {
9696 loop@ while (true ) {
9797 if (enqueueSend(send)) {
9898 cont.initCancellability() // make it properly cancellable
99- cont.removeOnCompletion (send)
99+ cont.removeOnCancel (send)
100100 return @sc
101101 }
102102 // hm... something is not right. try to offer
@@ -120,13 +120,16 @@ public abstract class AbstractChannel<E> : Channel<E> {
120120 else
121121 queue.addLastIfPrev(send, { it !is ReceiveOrClosed <* > })
122122
123- override fun close (cause : Throwable ? ): Boolean {
123+ public final override fun close (cause : Throwable ? ): Boolean {
124124 val closed = Closed <E >(cause)
125125 while (true ) {
126126 val receive = takeFirstReceiveOrPeekClosed()
127127 if (receive == null ) {
128128 // queue empty or has only senders -- try add last "Closed" item to the queue
129- if (queue.addLastIfPrev(closed, { it !is ReceiveOrClosed <* > })) return true
129+ if (queue.addLastIfPrev(closed, { it !is ReceiveOrClosed <* > })) {
130+ afterClose(cause)
131+ return true
132+ }
130133 continue // retry on failure
131134 }
132135 if (receive is Closed <* >) return false // already marked as closed -- nothing to do
@@ -135,6 +138,11 @@ public abstract class AbstractChannel<E> : Channel<E> {
135138 }
136139 }
137140
141+ /* *
142+ * Invoked after successful [close].
143+ */
144+ protected open fun afterClose (cause : Throwable ? ) {}
145+
138146 /* *
139147 * Retrieves first receiving waiter from the queue or returns closed token.
140148 */
@@ -143,11 +151,11 @@ public abstract class AbstractChannel<E> : Channel<E> {
143151
144152 // ------ ReceiveChannel ------
145153
146- override val isClosedForReceive: Boolean get() = closedForReceive != null && isBufferEmpty
147- override val isEmpty: Boolean get() = queue.next() !is Send && isBufferEmpty
154+ public final override val isClosedForReceive: Boolean get() = closedForReceive != null && isBufferEmpty
155+ public final override val isEmpty: Boolean get() = queue.next() !is Send && isBufferEmpty
148156
149157 @Suppress(" UNCHECKED_CAST" )
150- suspend override fun receive (): E {
158+ public final override suspend fun receive (): E {
151159 // fast path -- try poll non-blocking
152160 val result = pollInternal()
153161 if (result != = POLL_EMPTY ) return receiveResult(result)
@@ -167,7 +175,7 @@ public abstract class AbstractChannel<E> : Channel<E> {
167175 while (true ) {
168176 if (enqueueReceive(receive)) {
169177 cont.initCancellability() // make it properly cancellable
170- cont.removeOnCompletion( receive)
178+ removeReceiveOnCancel(cont, receive)
171179 return @sc
172180 }
173181 // hm... something is not right. try to poll
@@ -183,14 +191,16 @@ public abstract class AbstractChannel<E> : Channel<E> {
183191 }
184192 }
185193
186- private fun enqueueReceive (receive : Receive <E >) =
187- if (hasBuffer)
188- queue.addLastIfPrevAndIf(receive, { it !is Send }, { isBufferEmpty })
189- else
194+ private fun enqueueReceive (receive : Receive <E >): Boolean {
195+ val result = if (hasBuffer)
196+ queue.addLastIfPrevAndIf(receive, { it !is Send }, { isBufferEmpty }) else
190197 queue.addLastIfPrev(receive, { it !is Send })
198+ if (result) onEnqueuedReceive()
199+ return result
200+ }
191201
192202 @Suppress(" UNCHECKED_CAST" )
193- suspend override fun receiveOrNull (): E ? {
203+ public final override suspend fun receiveOrNull (): E ? {
194204 // fast path -- try poll non-blocking
195205 val result = pollInternal()
196206 if (result != = POLL_EMPTY ) return receiveOrNullResult(result)
@@ -213,7 +223,7 @@ public abstract class AbstractChannel<E> : Channel<E> {
213223 while (true ) {
214224 if (enqueueReceive(receive)) {
215225 cont.initCancellability() // make it properly cancellable
216- cont.removeOnCompletion( receive)
226+ removeReceiveOnCancel(cont, receive)
217227 return @sc
218228 }
219229 // hm... something is not right. try to poll
@@ -233,12 +243,12 @@ public abstract class AbstractChannel<E> : Channel<E> {
233243 }
234244
235245 @Suppress(" UNCHECKED_CAST" )
236- override fun poll (): E ? {
246+ public final override fun poll (): E ? {
237247 val result = pollInternal()
238248 return if (result == = POLL_EMPTY ) null else receiveOrNullResult(result)
239249 }
240250
241- override fun iterator (): ChannelIterator <E > = Iterator (this )
251+ public final override fun iterator (): ChannelIterator <E > = Iterator (this )
242252
243253 /* *
244254 * Retrieves first sending waiter from the queue or returns closed token.
@@ -262,6 +272,23 @@ public abstract class AbstractChannel<E> : Channel<E> {
262272 override fun toString (): String = string
263273 }
264274
275+ private fun removeReceiveOnCancel (cont : CancellableContinuation <* >, receive : Receive <* >) {
276+ cont.onCompletion {
277+ if (cont.isCancelled && receive.remove())
278+ onCancelledReceive()
279+ }
280+ }
281+
282+ /* *
283+ * Invoked when receiver is successfully enqueued to the queue of waiting receivers.
284+ */
285+ protected open fun onEnqueuedReceive () {}
286+
287+ /* *
288+ * Invoked when enqueued receiver was successfully cancelled.
289+ */
290+ protected open fun onCancelledReceive () {}
291+
265292 private class Iterator <E >(val channel : AbstractChannel <E >) : ChannelIterator<E> {
266293 var result: Any? = POLL_EMPTY // E | POLL_CLOSED | POLL_EMPTY
267294
@@ -288,7 +315,7 @@ public abstract class AbstractChannel<E> : Channel<E> {
288315 while (true ) {
289316 if (channel.enqueueReceive(receive)) {
290317 cont.initCancellability() // make it properly cancellable
291- cont.removeOnCompletion( receive)
318+ channel.removeReceiveOnCancel(cont, receive)
292319 return @sc
293320 }
294321 // hm... something is not right. try to poll
0 commit comments