@@ -131,17 +131,28 @@ internal class ByteBufferChannel(
131131 var _allocated : ReadWriteBufferState .Initial ? = null
132132 val (old, newState) = updateState { state ->
133133 when {
134- joining != null -> return null
135- closed != null -> throw closed!! .sendException
134+ joining != null -> {
135+ _allocated ?.let { releaseBuffer(it) }
136+ return null
137+ }
138+ closed != null -> {
139+ _allocated ?.let { releaseBuffer(it) }
140+ throw closed!! .sendException
141+ }
136142 state == = ReadWriteBufferState .IdleEmpty -> {
137143 val allocated = _allocated ? : newBuffer().also { _allocated = it }
138144 allocated.startWriting()
139145 }
140- state == = ReadWriteBufferState .Terminated -> throw closed!! .sendException
146+ state == = ReadWriteBufferState .Terminated -> {
147+ _allocated ?.let { releaseBuffer(it) }
148+ if (joining != null ) return null
149+ throw closed!! .sendException
150+ }
141151 else -> state.startWriting()
142152 }
143153 }
144154
155+ // joining?.let { restoreStateAfterWrite(); return null }
145156 if (closed != null ) {
146157 restoreStateAfterWrite()
147158 tryTerminate()
@@ -247,34 +258,44 @@ internal class ByteBufferChannel(
247258 }
248259
249260 private fun tryCompleteJoining (joined : JoiningState ): Boolean {
250- updateState { state ->
251- when {
252- state == = ReadWriteBufferState .Terminated -> state
253- state == = ReadWriteBufferState .IdleEmpty -> ReadWriteBufferState .Terminated
254- // we don't handle IdleNonEmpty as it should be switched to IdleEmpty in restoreStateAfterRead
255- else -> return false
256- }
257- }
258-
261+ if (! tryReleaseBuffer()) return false
259262 ensureClosedJoined(joined)
260263
261- ReadOp .getAndSet( this , null )?.resumeWithException (IllegalStateException (" Joining is in progress" ))
262- WriteOp .getAndSet( this , null )?. resume( Unit )
264+ resumeReadOp (IllegalStateException (" Joining is in progress" ))
265+ resumeWriteOp() // here we don't resume it with exception because it should resume and delegate writing
263266
264267 return true
265268 }
266269
267270 private fun tryTerminate (): Boolean {
268- val closed = closed ? : return false
271+ if (closed == null ) return false
272+
273+ if (! tryReleaseBuffer()) return false
274+
275+ joining?.let { ensureClosedJoined(it) }
276+
277+ resumeReadOp()
278+ resumeWriteOp()
279+
280+ return true
281+ }
269282
283+ private fun tryReleaseBuffer (): Boolean {
270284 var toRelease: ReadWriteBufferState .Initial ? = null
271285
272286 updateState { state ->
287+ toRelease?.let { buffer ->
288+ toRelease = null
289+ buffer.capacity.resetForWrite()
290+ resumeWriteOp()
291+ }
292+ val closed = closed
293+
273294 when {
274295 state == = ReadWriteBufferState .Terminated -> return true
275296 state == = ReadWriteBufferState .IdleEmpty -> ReadWriteBufferState .Terminated
276- closed.cause != null && state is ReadWriteBufferState .IdleNonEmpty -> {
277- // here we don't need to tryLockForRelease as we already have closed state
297+ closed != null && state is ReadWriteBufferState .IdleNonEmpty && (state.capacity.tryLockForRelease() || closed.cause != null ) -> {
298+ if (closed.cause != null ) state.capacity.forceLockForRelease()
278299 toRelease = state.initial
279300 ReadWriteBufferState .Terminated
280301 }
@@ -288,13 +309,6 @@ internal class ByteBufferChannel(
288309 }
289310 }
290311
291- joining?.let { ensureClosedJoined(it) }
292-
293- WriteOp .getAndSet(this , null )?.resumeWithException(closed.sendException)
294- ReadOp .getAndSet(this , null )?.apply {
295- if (closed.cause != null ) resumeWithException(closed.cause) else resume(false )
296- }
297-
298312 return true
299313 }
300314
@@ -1795,7 +1809,17 @@ internal class ByteBufferChannel(
17951809 suspend override fun <A : Appendable > readUTF8LineTo (out : A , limit : Int ) = readUTF8LineToAscii(out , limit)
17961810
17971811 private fun resumeReadOp () {
1798- ReadOp .getAndSet(this , null )?.resume(true )
1812+ ReadOp .getAndSet(this , null )?.apply {
1813+ val closedCause = closed?.cause
1814+ when {
1815+ closedCause != null -> resumeWithException(closedCause)
1816+ else -> resume(true )
1817+ }
1818+ }
1819+ }
1820+
1821+ private fun resumeReadOp (result : Throwable ) {
1822+ ReadOp .getAndSet(this , null )?.resumeWithException(result)
17991823 }
18001824
18011825 private fun resumeWriteOp () {
@@ -1805,6 +1829,10 @@ internal class ByteBufferChannel(
18051829 }
18061830 }
18071831
1832+ private fun resumeWriteOp (cause : Throwable ) {
1833+ WriteOp .getAndSet(this , null )?.resumeWithException(cause)
1834+ }
1835+
18081836 private fun resumeClosed (cause : Throwable ? ) {
18091837 ReadOp .getAndSet(this , null )?.let { c ->
18101838 if (cause != null )
0 commit comments