@@ -142,7 +142,6 @@ internal class ByteBufferChannel(
142142 }
143143 }
144144
145- joining?.let { restoreStateAfterWrite(); tryCompleteJoining(it); return null }
146145 if (closed != null ) {
147146 restoreStateAfterWrite()
148147 tryTerminate()
@@ -248,44 +247,34 @@ internal class ByteBufferChannel(
248247 }
249248
250249 private fun tryCompleteJoining (joined : JoiningState ): Boolean {
251- if (! tryReleaseBuffer()) return false
250+ updateState { state ->
251+ when {
252+ state == = ReadWriteBufferState .Terminated -> state
253+ state == = ReadWriteBufferState .IdleEmpty -> ReadWriteBufferState .Terminated
254+ // we don't handle IdleNonEmpty as it should be switched to IdleEmpty in restoreStateAfterRead
255+ else -> return false
256+ }
257+ }
258+
252259 ensureClosedJoined(joined)
253260
254- resumeReadOp (IllegalStateException (" Joining is in progress" ))
255- resumeWriteOp() // here we don't resume it with exception because it should resume and delegate writing
261+ ReadOp .getAndSet( this , null )?.resumeWithException (IllegalStateException (" Joining is in progress" ))
262+ WriteOp .getAndSet( this , null )?. resume( Unit )
256263
257264 return true
258265 }
259266
260267 private fun tryTerminate (): Boolean {
261- if ( closed == null ) return false
268+ val closed = closed ? : return false
262269
263- if (! tryReleaseBuffer()) return false
264-
265- joining?.let { ensureClosedJoined(it) }
266-
267- resumeReadOp()
268- resumeWriteOp()
269-
270- return true
271- }
272-
273- private fun tryReleaseBuffer (): Boolean {
274270 var toRelease: ReadWriteBufferState .Initial ? = null
275271
276272 updateState { state ->
277- toRelease?.let { buffer ->
278- toRelease = null
279- buffer.capacity.resetForWrite()
280- resumeWriteOp()
281- }
282- val closed = closed
283-
284273 when {
285274 state == = ReadWriteBufferState .Terminated -> return true
286275 state == = ReadWriteBufferState .IdleEmpty -> ReadWriteBufferState .Terminated
287- closed != null && state is ReadWriteBufferState .IdleNonEmpty && (state.capacity.tryLockForRelease() || closed.cause != null ) -> {
288- if (closed.cause != null ) state.capacity.forceLockForRelease()
276+ closed.cause != null && state is ReadWriteBufferState .IdleNonEmpty -> {
277+ // here we don't need to tryLockForRelease as we already have closed state
289278 toRelease = state.initial
290279 ReadWriteBufferState .Terminated
291280 }
@@ -299,6 +288,13 @@ internal class ByteBufferChannel(
299288 }
300289 }
301290
291+ joining?.let { ensureClosedJoined(it) }
292+
293+ WriteOp .getAndSet(this , null )?.resumeWithException(closed.sendException)
294+ ReadOp .getAndSet(this , null )?.apply {
295+ if (closed.cause != null ) resumeWithException(closed.cause) else resume(false )
296+ }
297+
302298 return true
303299 }
304300
@@ -1799,17 +1795,7 @@ internal class ByteBufferChannel(
17991795 suspend override fun <A : Appendable > readUTF8LineTo (out : A , limit : Int ) = readUTF8LineToAscii(out , limit)
18001796
18011797 private fun resumeReadOp () {
1802- ReadOp .getAndSet(this , null )?.apply {
1803- val closedCause = closed?.cause
1804- when {
1805- closedCause != null -> resumeWithException(closedCause)
1806- else -> resume(true )
1807- }
1808- }
1809- }
1810-
1811- private fun resumeReadOp (result : Throwable ) {
1812- ReadOp .getAndSet(this , null )?.resumeWithException(result)
1798+ ReadOp .getAndSet(this , null )?.resume(true )
18131799 }
18141800
18151801 private fun resumeWriteOp () {
@@ -1819,10 +1805,6 @@ internal class ByteBufferChannel(
18191805 }
18201806 }
18211807
1822- private fun resumeWriteOp (cause : Throwable ) {
1823- WriteOp .getAndSet(this , null )?.resumeWithException(cause)
1824- }
1825-
18261808 private fun resumeClosed (cause : Throwable ? ) {
18271809 ReadOp .getAndSet(this , null )?.let { c ->
18281810 if (cause != null )
0 commit comments