@@ -630,6 +630,76 @@ class ByteBufferChannelTest : TestBase() {
630630 assertEquals(" abc" , ch.readASCIILine())
631631 }
632632
633+ @Test
634+ fun testReadAndWriteLarge () = runTest {
635+ val count = 128L * 1024 * stressTestMultiplier // * 8192 -> 1G * M
636+ val data = ByteBuffer .allocate(8192 )!!
637+ Random ().nextBytes(data.array())
638+
639+ launch(" writer" ) {
640+ repeat(count.toInt()) {
641+ data.clear()
642+ ch.writeFully(data)
643+ }
644+ ch.close()
645+ }
646+
647+ launch(" reader" ) {
648+ val buffer = ByteBuffer .allocate(8192 )!!
649+ var read = 0L
650+ val total = count * 8192
651+
652+ while (read < total) {
653+ buffer.clear()
654+ val rc = ch.readFully(buffer)
655+ if (rc == - 1 ) break
656+ read + = rc
657+ }
658+
659+ assertEquals(total, read)
660+
661+ buffer.clear()
662+ assertEquals(- 1 , ch.readAvailable(buffer))
663+ }
664+ }
665+
666+ @Test
667+ fun testReadAndWriteLargeViaLookAheadSession () = runTest {
668+ val count = 128L * 1024 * stressTestMultiplier // * 8192 -> 1G * M
669+ val data = ByteBuffer .allocate(8192 )!!
670+ Random ().nextBytes(data.array())
671+
672+ launch(" writer" ) {
673+ repeat(count.toInt()) {
674+ data.clear()
675+ ch.writeFully(data)
676+ }
677+ ch.close()
678+ }
679+
680+ launch(" reader" ) {
681+ var read = 0L
682+ val total = count * 8192
683+
684+ ch.lookAheadSuspend {
685+ while (read < total) {
686+ val bb = request(0 , 1 )
687+ if (bb == null ) {
688+ if (! awaitAtLeast(1 )) break
689+ continue
690+ }
691+ val rc = bb.remaining()
692+ bb.position(bb.limit())
693+ read + = rc
694+ consumed(rc)
695+ }
696+ }
697+
698+ assertEquals(total, read)
699+ assertEquals(- 1 , ch.readAvailable(ByteBuffer .allocate(8192 )))
700+ }
701+ }
702+
633703 @Test
634704 fun testCopyLarge () {
635705 val count = 100 * 256 * stressTestMultiplier // * 8192
@@ -752,6 +822,16 @@ class ByteBufferChannelTest : TestBase() {
752822 latch.await()
753823 }
754824
825+ private fun CoroutineScope.launch (name : String = "child", block : suspend () -> Unit ): Job {
826+ return launch(context = DefaultDispatcher + CoroutineName (name), parent = coroutineContext[Job ]) {
827+ block()
828+ }.apply {
829+ invokeOnCompletion(true ) { t ->
830+ if (t != null ) ch.cancel(t)
831+ }
832+ }
833+ }
834+
755835 private fun launch (block : suspend () -> Unit ): Job {
756836 return launch(DefaultDispatcher ) {
757837 try {
0 commit comments