@@ -135,7 +135,8 @@ internal class ByteBufferChannel(
135135 }
136136
137137 if (avr >= minReadSize) resumeReadOp()
138- if (avw >= minWriteSize) resumeWriteOp()
138+ val joining = joining
139+ if (avw >= minWriteSize && (joining == null || state == = ReadWriteBufferState .Terminated )) resumeWriteOp()
139140 }
140141
141142 override fun flush () {
@@ -1215,7 +1216,7 @@ internal class ByteBufferChannel(
12151216
12161217 var partSize = 0
12171218
1218- val rc = src.reading { srcState ->
1219+ src.reading { srcState ->
12191220 val srcBuffer = this
12201221
12211222 val rem = minOf(srcBuffer.remaining().toLong(), dstBuffer.remaining().toLong(), limit - copied).toInt()
@@ -1236,7 +1237,7 @@ internal class ByteBufferChannel(
12361237 true
12371238 }
12381239
1239- if (rc ) {
1240+ if (partSize > 0 ) {
12401241 dstBuffer.bytesWritten(state, partSize)
12411242 copied + = partSize
12421243
@@ -1266,7 +1267,7 @@ internal class ByteBufferChannel(
12661267 }
12671268
12681269 if (joining != null ) {
1269- yield ( )
1270+ tryWriteSuspend( 1 )
12701271 }
12711272 }
12721273
@@ -2038,16 +2039,17 @@ internal class ByteBufferChannel(
20382039 }
20392040
20402041 private fun resumeWriteOp () {
2041- WriteOp .getAndSet(this , null )?.apply {
2042+ while (true ) {
2043+ val writeOp = writeOp ? : return
20422044 val closed = closed
2043- if (closed == null ) resume(Unit ) else resumeWithException(closed.sendException)
2045+ if (closed == null && joining != null && state != = ReadWriteBufferState .Terminated ) return
2046+ if (WriteOp .compareAndSet(this , writeOp, null )) {
2047+ if (closed == null ) writeOp.resume(Unit ) else writeOp.resumeWithException(closed.sendException)
2048+ return
2049+ }
20442050 }
20452051 }
20462052
2047- private fun resumeWriteOp (cause : Throwable ) {
2048- WriteOp .getAndSet(this , null )?.resumeWithException(cause)
2049- }
2050-
20512053 private fun resumeClosed (cause : Throwable ? ) {
20522054 ReadOp .getAndSet(this , null )?.let { c ->
20532055 if (cause != null )
0 commit comments