@@ -172,34 +172,10 @@ internal open class BufferedChannel<E>(
172172 segment : ChannelSegment <E >,
173173 index : Int
174174 ) {
175- if (onUndeliveredElement == null ) {
176- invokeOnCancellation(segment, index)
177- } else {
178- when (this ) {
179- is CancellableContinuation <* > -> {
180- invokeOnCancellation(SenderWithOnUndeliveredElementCancellationHandler (segment, index, context).asHandler)
181- }
182- is SelectInstance <* > -> {
183- disposeOnCompletion(SenderWithOnUndeliveredElementCancellationHandler (segment, index, context))
184- }
185- is SendBroadcast -> {
186- cont.invokeOnCancellation(SenderWithOnUndeliveredElementCancellationHandler (segment, index, cont.context).asHandler)
187- }
188- else -> error(" unexpected sender: $this " )
189- }
190- }
191- }
192-
193- private inner class SenderWithOnUndeliveredElementCancellationHandler (
194- private val segment : ChannelSegment <E >,
195- private val index : Int ,
196- private val context : CoroutineContext
197- ) : BeforeResumeCancelHandler(), DisposableHandle {
198- override fun dispose () {
199- segment.onSenderCancellationWithOnUndeliveredElement(index, context)
200- }
201-
202- override fun invoke (cause : Throwable ? ) = dispose()
175+ // To distinguish cancelled senders and receivers,
176+ // senders equip the index value with an additional marker,
177+ // adding `SEGMENT_SIZE` to the value.
178+ invokeOnCancellation(segment, index + SEGMENT_SIZE )
203179 }
204180
205181 private fun onClosedSendOnNoWaiterSuspend (element : E , cont : CancellableContinuation <Unit >) {
@@ -1594,7 +1570,7 @@ internal open class BufferedChannel<E>(
15941570 * and [SelectInstance.trySelect]. When the channel becomes closed,
15951571 * [tryResumeHasNextOnClosedChannel] should be used instead.
15961572 */
1597- private inner class BufferedChannelIterator : ChannelIterator <E >, BeforeResumeCancelHandler (), Waiter {
1573+ private inner class BufferedChannelIterator : ChannelIterator <E >, Waiter {
15981574 /* *
15991575 * Stores the element retrieved by [hasNext] or
16001576 * a special [CHANNEL_CLOSED] token if this channel is closed.
@@ -1607,20 +1583,7 @@ internal open class BufferedChannel<E>(
16071583 * continuation. The [tryResumeHasNext] and [tryResumeHasNextOnClosedChannel]
16081584 * function resume this continuation when the [hasNext] invocation should complete.
16091585 */
1610- private var continuation: CancellableContinuation <Boolean >? = null
1611-
1612- // When `hasNext()` suspends, the location where the continuation
1613- // is stored is specified via the segment and the index in it.
1614- // We need this information in the cancellation handler below.
1615- private var segment: Segment <* >? = null
1616- private var index = - 1
1617-
1618- /* *
1619- * Invoked on cancellation, [BeforeResumeCancelHandler] implementation.
1620- */
1621- override fun invoke (cause : Throwable ? ) {
1622- segment?.onCancellation(index, null )
1623- }
1586+ private var continuation: CancellableContinuationImpl <Boolean >? = null
16241587
16251588 // `hasNext()` is just a special receive operation.
16261589 override suspend fun hasNext (): Boolean =
@@ -1680,11 +1643,7 @@ internal open class BufferedChannel<E>(
16801643 }
16811644
16821645 override fun invokeOnCancellation (segment : Segment <* >, index : Int ) {
1683- this .segment = segment
1684- this .index = index
1685- // It is possible that this `hasNext()` invocation is already
1686- // resumed, and the `continuation` field is already updated to `null`.
1687- this .continuation?.invokeOnCancellation(this .asHandler)
1646+ this .continuation?.invokeOnCancellation(segment, index)
16881647 }
16891648
16901649 private fun onClosedHasNextNoWaiterSuspend () {
@@ -2826,67 +2785,51 @@ internal class ChannelSegment<E>(id: Long, prev: ChannelSegment<E>?, channel: Bu
28262785 // # Cancellation Support #
28272786 // ########################
28282787
2829- override fun onCancellation (index : Int , cause : Throwable ? ) {
2830- onCancellation(index)
2831- }
2832-
2833- fun onSenderCancellationWithOnUndeliveredElement (index : Int , context : CoroutineContext ) {
2834- // Read the element first. If the operation has not been successfully resumed
2835- // (this cancellation may be caused by prompt cancellation during dispatching),
2836- // it is guaranteed that the element is presented.
2788+ override fun onCancellation (index : Int , cause : Throwable ? , context : CoroutineContext ) {
2789+ // To distinguish cancelled senders and receivers, senders equip the index value with
2790+ // an additional marker, adding `SEGMENT_SIZE` to the value.
2791+ val isSender = index >= SEGMENT_SIZE
2792+ // Unwrap the index.
2793+ @Suppress(" NAME_SHADOWING" ) val index = if (isSender) index - SEGMENT_SIZE else index
2794+ // Read the element, which may be needed further to call `onUndeliveredElement`.
28372795 val element = getElement(index)
2838- // Perform the cancellation; `onCancellationImpl(..)` return `true` if the
2839- // cancelled operation had not been resumed. In this case, the `onUndeliveredElement`
2840- // lambda should be called.
2841- if (onCancellation(index)) {
2842- channel.onUndeliveredElement!! .callUndeliveredElement(element, context)
2843- }
2844- }
2845-
2846- /* *
2847- * Returns `true` if the request is successfully cancelled,
2848- * and no rendezvous has happened. We need this knowledge
2849- * to keep [BufferedChannel.onUndeliveredElement] correct.
2850- */
2851- @Suppress(" ConvertTwoComparisonsToRangeCheck" )
2852- fun onCancellation (index : Int ): Boolean {
2853- // Count the global index of this cell and read
2854- // the current counters of send and receive operations.
2855- val globalIndex = id * SEGMENT_SIZE + index
2856- val s = channel.sendersCounter
2857- val r = channel.receiversCounter
2858- // Update the cell state trying to distinguish whether
2859- // the cancelled coroutine is sender or receiver.
2860- var isSender: Boolean
2861- var isReceiver: Boolean
2862- while (true ) { // CAS-loop
2796+ // Update the cell state.
2797+ while (true ) {
2798+ // CAS-loop
28632799 // Read the current state of the cell.
2864- val cur = data[ index * 2 + 1 ].value
2800+ val cur = getState( index)
28652801 when {
28662802 // The cell stores a waiter.
28672803 cur is Waiter || cur is WaiterEB -> {
2868- // Is the cancelled request send for sure?
2869- isSender = globalIndex < s && globalIndex >= r
2870- // Is the cancelled request receiver for sure?
2871- isReceiver = globalIndex < r && globalIndex >= s
2872- // If the cancelled coroutine neither sender
2873- // nor receiver, clean the element slot and finish.
2874- // An opposite operation will resume this request
2875- // and update the cell state eventually.
2876- if (! isSender && ! isReceiver) {
2877- cleanElement(index)
2878- return true
2879- }
28802804 // The cancelled request is either send or receive.
28812805 // Update the cell state correspondingly.
28822806 val update = if (isSender) INTERRUPTED_SEND else INTERRUPTED_RCV
2883- if (data[index * 2 + 1 ].compareAndSet(cur, update)) break
2807+ if (casState(index, cur, update)) {
2808+ // The waiter has been successfully cancelled.
2809+ // Clean the element slot and invoke `onSlotCleaned()`,
2810+ // which may cause deleting the whole segment from the linked list.
2811+ // In case the cancelled request is receiver, it is critical to ensure
2812+ // that the `expandBuffer()` attempt that processes this cell is completed,
2813+ // so `onCancelledRequest(..)` waits for its completion before invoking `onSlotCleaned()`.
2814+ cleanElement(index)
2815+ onCancelledRequest(index, ! isSender)
2816+ // Call `onUndeliveredElement` if needed.
2817+ if (isSender) {
2818+ channel.onUndeliveredElement?.callUndeliveredElement(element, context)
2819+ }
2820+ return
2821+ }
28842822 }
28852823 // The cell already indicates that the operation is cancelled.
28862824 cur == = INTERRUPTED_SEND || cur == = INTERRUPTED_RCV -> {
2887- // Clean the element slot to avoid memory leaks and finish.
2825+ // Clean the element slot to avoid memory leaks,
2826+ // invoke `onUndeliveredElement` if needed, and finish
28882827 cleanElement(index)
2889- return true
2828+ // Call `onUndeliveredElement` if needed.
2829+ if (isSender) {
2830+ channel.onUndeliveredElement?.callUndeliveredElement(element, context)
2831+ }
2832+ return
28902833 }
28912834 // An opposite operation is resuming this request;
28922835 // wait until the cell state updates.
@@ -2897,23 +2840,13 @@ internal class ChannelSegment<E>(id: Long, prev: ChannelSegment<E>?, channel: Bu
28972840 cur == = RESUMING_BY_EB || cur == = RESUMING_BY_RCV -> continue
28982841 // This request was successfully resumed, so this cancellation
28992842 // is caused by the prompt cancellation feature and should be ignored.
2900- cur == = DONE_RCV || cur == = BUFFERED -> return false
2843+ cur == = DONE_RCV || cur == = BUFFERED -> return
29012844 // The cell state indicates that the channel is closed;
29022845 // this cancellation should be ignored.
2903- cur == = CHANNEL_CLOSED -> {
2904- return false
2905- }
2846+ cur == = CHANNEL_CLOSED -> return
29062847 else -> error(" unexpected state: $cur " )
29072848 }
29082849 }
2909- // Clean the element slot and invoke `onSlotCleaned()`,
2910- // which may cause deleting the whole segment from the linked list.
2911- // In case the cancelled request is receiver, it is critical to ensure
2912- // that the `expandBuffer()` attempt that processes this cell is completed,
2913- // so `onCancelledRequest(..)` waits for its completion before invoking `onSlotCleaned()`.
2914- cleanElement(index)
2915- onCancelledRequest(index, isReceiver)
2916- return true
29172850 }
29182851
29192852 /* *
0 commit comments