@@ -23,66 +23,141 @@ import kotlinx.coroutines.experimental.selects.SelectInstance
2323import kotlin.coroutines.experimental.startCoroutine
2424
2525/* *
26- * Abstract channel. It is a base class for buffered and unbuffered channels.
27- *
28- * @suppress **This is unstable API and it is subject to change.**
26+ * Abstract channel. It is a base class for all channel implementations.
2927 */
3028public abstract class AbstractChannel <E > : Channel <E > {
3129 private val queue = LockFreeLinkedListHead ()
3230
3331 // ------ extension points for buffered channels ------
3432
3533 /* *
36- * Returns `true` if this channel has buffer.
34+ * Returns `true` if [isBufferEmpty] is always `true`.
35+ * @suppress **This is unstable API and it is subject to change.**
3736 */
38- protected abstract val hasBuffer : Boolean
37+ protected abstract val isBufferAlwaysEmpty : Boolean
3938
4039 /* *
4140 * Returns `true` if this channel's buffer is empty.
41+ * @suppress **This is unstable API and it is subject to change.**
4242 */
4343 protected abstract val isBufferEmpty: Boolean
4444
45+ /* *
46+ * Returns `true` if [isBufferFull] is always `true`.
47+ * @suppress **This is unstable API and it is subject to change.**
48+ */
49+ protected abstract val isBufferAlwaysFull: Boolean
50+
4551 /* *
4652 * Returns `true` if this channel's buffer is full.
53+ * @suppress **This is unstable API and it is subject to change.**
4754 */
4855 protected abstract val isBufferFull: Boolean
4956
57+ // ------ internal functions for override by buffered channels ------
58+
5059 /* *
5160 * Tries to add element to buffer or to queued receiver.
5261 * Return type is `OFFER_SUCCESS | OFFER_FAILED | Closed`.
62+ * @suppress **This is unstable API and it is subject to change.**
5363 */
54- protected abstract fun offerInternal (element : E ): Any
64+ protected open fun offerInternal (element : E ): Any {
65+ while (true ) {
66+ val receive = takeFirstReceiveOrPeekClosed() ? : return OFFER_FAILED
67+ val token = receive.tryResumeReceive(element, idempotent = null )
68+ if (token != null ) {
69+ receive.completeResumeReceive(token)
70+ return receive.offerResult
71+ }
72+ }
73+ }
5574
5675 /* *
5776 * Tries to add element to buffer or to queued receiver if select statement clause was not selected yet.
5877 * Return type is `ALREADY_SELECTED | OFFER_SUCCESS | OFFER_FAILED | Closed`.
78+ * @suppress **This is unstable API and it is subject to change.**
5979 */
60- protected abstract fun offerSelectInternal (element : E , select : SelectInstance <* >): Any
80+ protected open fun offerSelectInternal (element : E , select : SelectInstance <* >): Any {
81+ // offer atomically with select
82+ val offerOp = describeTryOffer(element)
83+ val failure = select.performAtomicTrySelect(offerOp)
84+ if (failure != null ) return failure
85+ val receive = offerOp.result
86+ receive.completeResumeReceive(offerOp.resumeToken!! )
87+ return receive.offerResult
88+ }
6189
6290 /* *
6391 * Tries to remove element from buffer or from queued sender.
6492 * Return type is `E | POLL_FAILED | Closed`
93+ * @suppress **This is unstable API and it is subject to change.**
6594 */
66- protected abstract fun pollInternal (): Any?
95+ protected open fun pollInternal (): Any? {
96+ while (true ) {
97+ val send = takeFirstSendOrPeekClosed() ? : return POLL_FAILED
98+ val token = send.tryResumeSend(idempotent = null )
99+ if (token != null ) {
100+ send.completeResumeSend(token)
101+ return send.pollResult
102+ }
103+ }
104+ }
67105
68106 /* *
69107 * Tries to remove element from buffer or from queued sender if select statement clause was not selected yet.
70108 * Return type is `ALREADY_SELECTED | E | POLL_FAILED | Closed`
109+ * @suppress **This is unstable API and it is subject to change.**
71110 */
72- protected abstract fun pollSelectInternal (select : SelectInstance <* >): Any?
111+ protected open fun pollSelectInternal (select : SelectInstance <* >): Any? {
112+ // poll atomically with select
113+ val pollOp = describeTryPoll()
114+ val failure = select.performAtomicTrySelect(pollOp)
115+ if (failure != null ) return failure
116+ val send = pollOp.result
117+ send.completeResumeSend(pollOp.resumeToken!! )
118+ return pollOp.pollResult
119+ }
73120
74- // ------ state functions for concrete implementations ------
121+ // ------ state functions & helpers for concrete implementations ------
75122
76123 /* *
77124 * Returns non-null closed token if it is first in the queue.
125+ * @suppress **This is unstable API and it is subject to change.**
78126 */
79127 protected val closedForReceive: Any? get() = queue.next as ? Closed <* >
80128
81129 /* *
82130 * Returns non-null closed token if it is last in the queue.
131+ * @suppress **This is unstable API and it is subject to change.**
83132 */
84133 protected val closedForSend: ReceiveOrClosed <* >? get() = queue.prev as ? Closed <* >
85134
135+ /* *
136+ * @suppress **This is unstable API and it is subject to change.**
137+ */
138+ protected val hasReceiveOrClosed: Boolean get() = queue.next is ReceiveOrClosed <* >
139+
140+ /* *
141+ * @suppress **This is unstable API and it is subject to change.**
142+ */
143+ protected fun sendBuffered (element : E ): Boolean =
144+ queue.addLastIfPrev(SendBuffered (element), { it !is ReceiveOrClosed <* > })
145+
146+ /* *
147+ * @suppress **This is unstable API and it is subject to change.**
148+ */
149+ protected fun describeSendBuffered (element : E ): AddLastDesc <* > = SendBufferedDesc (queue, element)
150+
151+ private class SendBufferedDesc <E >(
152+ queue : LockFreeLinkedListHead ,
153+ element : E
154+ ) : AddLastDesc<SendBuffered<E>>(queue, SendBuffered (element)) {
155+ override fun failure (affected : LockFreeLinkedListNode , next : Any ): Any? {
156+ if (affected is ReceiveOrClosed <* >) return OFFER_FAILED
157+ return null
158+ }
159+ }
160+
86161 // ------ SendChannel ------
87162
88163 public final override val isClosedForSend: Boolean get() = closedForSend != null
@@ -131,10 +206,9 @@ public abstract class AbstractChannel<E> : Channel<E> {
131206 }
132207
133208 private fun enqueueSend (send : SendElement ) =
134- if (hasBuffer)
209+ if (isBufferAlwaysFull)
210+ queue.addLastIfPrev(send, { it !is ReceiveOrClosed <* > }) else
135211 queue.addLastIfPrevAndIf(send, { it !is ReceiveOrClosed <* > }, { isBufferFull })
136- else
137- queue.addLastIfPrev(send, { it !is ReceiveOrClosed <* > })
138212
139213 public final override fun close (cause : Throwable ? ): Boolean {
140214 val closed = Closed <E >(cause)
@@ -161,14 +235,21 @@ public abstract class AbstractChannel<E> : Channel<E> {
161235
162236 /* *
163237 * Retrieves first receiving waiter from the queue or returns closed token.
238+ * @suppress **This is unstable API and it is subject to change.**
164239 */
165240 protected fun takeFirstReceiveOrPeekClosed (): ReceiveOrClosed <E >? =
166241 queue.removeFirstIfIsInstanceOfOrPeekIf<ReceiveOrClosed <E >>({ it is Closed <* > })
167242
168243 // ------ registerSelectSend ------
169244
245+ /* *
246+ * @suppress **This is unstable API and it is subject to change.**
247+ */
170248 protected fun describeTryOffer (element : E ): TryOfferDesc <E > = TryOfferDesc (element, queue)
171249
250+ /* *
251+ * @suppress **This is unstable API and it is subject to change.**
252+ */
172253 protected class TryOfferDesc <E >(
173254 @JvmField val element : E ,
174255 queue : LockFreeLinkedListHead
@@ -283,9 +364,9 @@ public abstract class AbstractChannel<E> : Channel<E> {
283364 }
284365
285366 private fun enqueueReceive (receive : Receive <E >): Boolean {
286- val result = if (hasBuffer )
287- queue.addLastIfPrevAndIf (receive, { it !is Send }, { isBufferEmpty }) else
288- queue.addLastIfPrev (receive, { it !is Send })
367+ val result = if (isBufferAlwaysEmpty )
368+ queue.addLastIfPrev (receive, { it !is Send }) else
369+ queue.addLastIfPrevAndIf (receive, { it !is Send }, { isBufferEmpty })
289370 if (result) onEnqueuedReceive()
290371 return result
291372 }
@@ -343,14 +424,21 @@ public abstract class AbstractChannel<E> : Channel<E> {
343424
344425 /* *
345426 * Retrieves first sending waiter from the queue or returns closed token.
427+ * @suppress **This is unstable API and it is subject to change.**
346428 */
347429 protected fun takeFirstSendOrPeekClosed (): Send ? =
348430 queue.removeFirstIfIsInstanceOfOrPeekIf<Send > { it is Closed <* > }
349431
350432 // ------ registerSelectReceive ------
351433
434+ /* *
435+ * @suppress **This is unstable API and it is subject to change.**
436+ */
352437 protected fun describeTryPoll (): TryPollDesc <E > = TryPollDesc (queue)
353438
439+ /* *
440+ * @suppress **This is unstable API and it is subject to change.**
441+ */
354442 protected class TryPollDesc <E >(queue : LockFreeLinkedListHead ) : RemoveFirstDesc<Send>(queue) {
355443 @JvmField var resumeToken: Any? = null
356444 @JvmField var pollResult: E ? = null
@@ -387,8 +475,10 @@ public abstract class AbstractChannel<E> : Channel<E> {
387475
388476 override fun finishOnSuccess (affected : LockFreeLinkedListNode , next : LockFreeLinkedListNode ) {
389477 super .finishOnSuccess(affected, next)
478+ // notify the there is one more receiver
479+ onEnqueuedReceive()
390480 // we can actually remove on select start, but this is also Ok (it'll get removed if discovered there)
391- ( node as ReceiveSelect < * , * >) .removeOnSelectCompletion()
481+ node.removeOnSelectCompletion()
392482 }
393483 }
394484
@@ -459,25 +549,35 @@ public abstract class AbstractChannel<E> : Channel<E> {
459549 protected companion object {
460550 private const val DEFAULT_CLOSE_MESSAGE = " Channel was closed"
461551
552+ /* * @suppress **This is unstable API and it is subject to change.** */
462553 @JvmStatic
463554 val OFFER_SUCCESS : Any = Symbol (" OFFER_SUCCESS" )
555+
556+ /* * @suppress **This is unstable API and it is subject to change.** */
464557 @JvmStatic
465558 val OFFER_FAILED : Any = Symbol (" OFFER_FAILED" )
466559
560+ /* * @suppress **This is unstable API and it is subject to change.** */
467561 @JvmStatic
468562 val POLL_FAILED : Any = Symbol (" POLL_FAILED" )
469563
564+ /* * @suppress **This is unstable API and it is subject to change.** */
470565 @JvmStatic
471566 val ENQUEUE_FAILED : Any = Symbol (" ENQUEUE_FAILED" )
472567
473568 @JvmStatic
474569 private val SELECT_STARTED : Any = Symbol (" SELECT_STARTED" )
570+
475571 @JvmStatic
476572 private val NULL_VALUE : Any = Symbol (" NULL_VALUE" )
477573
478574 @JvmStatic
479575 private val CLOSE_RESUMED : Any = Symbol (" CLOSE_RESUMED" )
480576
577+ @JvmStatic
578+ private val SEND_RESUMED = Symbol (" SEND_RESUMED" )
579+
580+ /* * @suppress **This is unstable API and it is subject to change.** */
481581 @JvmStatic
482582 fun isClosed (result : Any? ): Boolean = result is Closed <* >
483583 }
@@ -562,6 +662,7 @@ public abstract class AbstractChannel<E> : Channel<E> {
562662
563663 /* *
564664 * Represents sending waiter in the queue.
665+ * @suppress **This is unstable API and it is subject to change.**
565666 */
566667 protected interface Send {
567668 val pollResult: Any? // E | Closed
@@ -571,6 +672,7 @@ public abstract class AbstractChannel<E> : Channel<E> {
571672
572673 /* *
573674 * Represents receiver waiter in the queue or closed token.
675+ * @suppress **This is unstable API and it is subject to change.**
574676 */
575677 protected interface ReceiveOrClosed <in E > {
576678 val offerResult: Any // OFFER_SUCCESS | Closed
@@ -612,8 +714,17 @@ public abstract class AbstractChannel<E> : Channel<E> {
612714 override fun toString (): String = " SendSelect($pollResult )[$select ]"
613715 }
614716
717+ private class SendBuffered <out E >(
718+ @JvmField val element : E
719+ ) : LockFreeLinkedListNode(), Send {
720+ override val pollResult: Any? get() = element
721+ override fun tryResumeSend (idempotent : Any? ): Any? = SEND_RESUMED
722+ override fun completeResumeSend (token : Any ) { check(token == = SEND_RESUMED ) }
723+ }
724+
615725 /* *
616726 * Represents closed channel.
727+ * @suppress **This is unstable API and it is subject to change.**
617728 */
618729 protected class Closed <in E >(
619730 @JvmField val closeCause : Throwable ?
@@ -705,7 +816,7 @@ public abstract class AbstractChannel<E> : Channel<E> {
705816 override fun toString (): String = " ReceiveHasNext[$cont ]"
706817 }
707818
708- private class ReceiveSelect <R , in E >(
819+ private inner class ReceiveSelect <R , in E >(
709820 @JvmField val select : SelectInstance <R >,
710821 @JvmField val block : suspend (E ? ) -> R ,
711822 @JvmField val nullOnClose : Boolean
@@ -728,12 +839,11 @@ public abstract class AbstractChannel<E> : Channel<E> {
728839 }
729840 }
730841
731- fun removeOnSelectCompletion () {
732- select.invokeOnCompletion(this )
733- }
842+ fun removeOnSelectCompletion () { select.invokeOnCompletion(this ) }
734843
735- override fun invoke (cause : Throwable ? ) {
736- remove()
844+ override fun invoke (cause : Throwable ? ) { // invoked on select completion
845+ if (remove())
846+ onCancelledReceive() // notify cancellation of receive
737847 }
738848
739849 override fun toString (): String = " ReceiveSelect[$select ,nullOnClose=$nullOnClose ]"
0 commit comments