@@ -98,22 +98,40 @@ public abstract class AbstractSendChannel<E> : SendChannel<E> {
9898 queue.removeFirstIfIsInstanceOfOrPeekIf<Send > { it is Closed <* > }
9999
100100 /* *
101+ * Queues buffered element, returns null on success or
102+ * returns node reference if it was already closed or is waiting for receive.
101103 * @suppress **This is unstable API and it is subject to change.**
102104 */
103- protected fun sendBuffered (element : E ): Boolean =
104- queue.addLastIfPrev(SendBuffered (element), { it !is ReceiveOrClosed <* > })
105+ protected fun sendBuffered (element : E ): ReceiveOrClosed <* >? {
106+ queue.addLastIfPrev(SendBuffered (element), { prev ->
107+ if (prev is ReceiveOrClosed <* >) return @sendBuffered prev
108+ true
109+ })
110+ return null
111+ }
105112
106113 /* *
114+ * Queues conflated element, returns null on success or
115+ * returns node reference if it was already closed or is waiting for receive.
107116 * @suppress **This is unstable API and it is subject to change.**
108117 */
109- protected fun sendConflated (element : E ): Boolean {
118+ protected fun sendConflated (element : E ): ReceiveOrClosed < * > ? {
110119 val node = SendBuffered (element)
111- if (! queue.addLastIfPrev(node, { it !is ReceiveOrClosed <* > })) return false
112- // remove previous SendBuffered
120+ queue.addLastIfPrev(node, { prev ->
121+ if (prev is ReceiveOrClosed <* >) return @sendConflated prev
122+ true
123+ })
124+ conflatePreviousSendBuffered(node)
125+ return null
126+ }
127+
128+ /* *
129+ * @suppress **This is unstable API and it is subject to change.**
130+ */
131+ protected fun conflatePreviousSendBuffered (node : LockFreeLinkedListNode ) {
113132 val prev = node.prev
114133 if (prev is SendBuffered <* >)
115134 prev.remove()
116- return true
117135 }
118136
119137 /* *
@@ -173,32 +191,56 @@ public abstract class AbstractSendChannel<E> : SendChannel<E> {
173191 private suspend fun sendSuspend (element : E ): Unit = suspendAtomicCancellableCoroutine(holdCancellability = true ) sc@ { cont ->
174192 val send = SendElement (element, cont)
175193 loop@ while (true ) {
176- if (enqueueSend(send)) {
177- cont.initCancellability() // make it properly cancellable
178- cont.removeOnCancel(send)
179- return @sc
194+ val enqueueResult = enqueueSend(send)
195+ when (enqueueResult) {
196+ null -> { // enqueued successfully
197+ cont.initCancellability() // make it properly cancellable
198+ cont.removeOnCancel(send)
199+ return @sc
200+ }
201+ is Closed <* > -> {
202+ cont.resumeWithException(enqueueResult.sendException)
203+ return @sc
204+ }
180205 }
181- // hm... something is not right . try to offer
182- val result = offerInternal(element)
206+ // hm... receiver is waiting or buffer is not full . try to offer
207+ val offerResult = offerInternal(element)
183208 when {
184- result == = OFFER_SUCCESS -> {
209+ offerResult == = OFFER_SUCCESS -> {
185210 cont.resume(Unit )
186211 return @sc
187212 }
188- result == = OFFER_FAILED -> continue @loop
189- result is Closed <* > -> {
190- cont.resumeWithException(result .sendException)
213+ offerResult == = OFFER_FAILED -> continue @loop
214+ offerResult is Closed <* > -> {
215+ cont.resumeWithException(offerResult .sendException)
191216 return @sc
192217 }
193- else -> error(" offerInternal returned $result " )
218+ else -> error(" offerInternal returned $offerResult " )
194219 }
195220 }
196221 }
197222
198- private fun enqueueSend (send : SendElement ) =
199- if (isBufferAlwaysFull)
200- queue.addLastIfPrev(send, { it !is ReceiveOrClosed <* > }) else
201- queue.addLastIfPrevAndIf(send, { it !is ReceiveOrClosed <* > }, { isBufferFull })
223+ /* *
224+ * Result is:
225+ * * null -- successfully enqueued
226+ * * ENQUEUE_FAILED -- buffer is not full (should not enqueue)
227+ * * ReceiveOrClosed<*> -- receiver is waiting or it is closed (should not enqueue)
228+ */
229+ private fun enqueueSend (send : SendElement ): Any? {
230+ if (isBufferAlwaysFull) {
231+ queue.addLastIfPrev(send, { prev ->
232+ if (prev is ReceiveOrClosed <* >) return @enqueueSend prev
233+ true
234+ })
235+ } else {
236+ if (! queue.addLastIfPrevAndIf(send, { prev ->
237+ if (prev is ReceiveOrClosed <* >) return @enqueueSend prev
238+ true
239+ }, { isBufferFull }))
240+ return ENQUEUE_FAILED
241+ }
242+ return null
243+ }
202244
203245 public override fun close (cause : Throwable ? ): Boolean {
204246 val closed = Closed <E >(cause)
@@ -207,6 +249,7 @@ public abstract class AbstractSendChannel<E> : SendChannel<E> {
207249 if (receive == null ) {
208250 // queue empty or has only senders -- try add last "Closed" item to the queue
209251 if (queue.addLastIfPrev(closed, { it !is ReceiveOrClosed <* > })) {
252+ onClosed(closed)
210253 afterClose(cause)
211254 return true
212255 }
@@ -218,6 +261,12 @@ public abstract class AbstractSendChannel<E> : SendChannel<E> {
218261 }
219262 }
220263
264+ /* *
265+ * Invoked when [Closed] element was just added.
266+ * @suppress **This is unstable API and it is subject to change.**
267+ */
268+ protected open fun onClosed (closed : Closed <E >) {}
269+
221270 /* *
222271 * Invoked after successful [close].
223272 */
@@ -870,8 +919,8 @@ public class Closed<in E>(
870919 override val pollResult get() = this
871920 override fun tryResumeSend (idempotent : Any? ): Any? = CLOSE_RESUMED
872921 override fun completeResumeSend (token : Any ) { check(token == = CLOSE_RESUMED ) }
873- override fun tryResumeReceive (value : E , idempotent : Any? ): Any? = throw sendException
874- override fun completeResumeReceive (token : Any ) = throw sendException
922+ override fun tryResumeReceive (value : E , idempotent : Any? ): Any? = CLOSE_RESUMED
923+ override fun completeResumeReceive (token : Any ) { check(token == = CLOSE_RESUMED ) }
875924 override fun toString (): String = " Closed[$closeCause ]"
876925}
877926
0 commit comments