|
16 | 16 | package org.msgpack.core.buffer |
17 | 17 |
|
18 | 18 | import java.io._ |
| 19 | +import java.net.{InetSocketAddress, ServerSocket, Socket} |
19 | 20 | import java.nio.ByteBuffer |
| 21 | +import java.nio.channels.{ServerSocketChannel, SocketChannel} |
| 22 | +import java.util.concurrent |
| 23 | +import java.util.concurrent.{Callable, Executors, TimeUnit} |
20 | 24 | import java.util.zip.{GZIPInputStream, GZIPOutputStream} |
21 | 25 |
|
22 | 26 | import org.msgpack.core.{MessagePack, MessagePackSpec, MessageUnpacker} |
23 | 27 | import xerial.core.io.IOUtil._ |
24 | 28 |
|
| 29 | +import scala.concurrent.Future |
25 | 30 | import scala.util.Random |
26 | 31 |
|
27 | 32 | class MessageBufferInputTest |
@@ -201,5 +206,44 @@ class MessageBufferInputTest |
201 | 206 | buf.reset(in1) |
202 | 207 | readInt(buf) shouldBe 42 |
203 | 208 | } |
| 209 | + |
| 210 | + "unpack without blocking" in { |
| 211 | + val server = ServerSocketChannel.open.bind(new InetSocketAddress("localhost", 0)) |
| 212 | + val executorService = Executors.newCachedThreadPool |
| 213 | + |
| 214 | + try { |
| 215 | + executorService.execute(new Runnable { |
| 216 | + override def run { |
| 217 | + val server_ch = server.accept |
| 218 | + val packer = MessagePack.newDefaultPacker(server_ch) |
| 219 | + packer.packString("0123456789") |
| 220 | + packer.flush |
| 221 | + // Keep the connection open |
| 222 | + while (!executorService.isShutdown) { |
| 223 | + TimeUnit.SECONDS.sleep(1) |
| 224 | + } |
| 225 | + packer.close |
| 226 | + } |
| 227 | + }) |
| 228 | + |
| 229 | + val future = executorService.submit(new Callable[String] { |
| 230 | + override def call: String = { |
| 231 | + val conn_ch = SocketChannel.open(new InetSocketAddress("localhost", server.socket.getLocalPort)) |
| 232 | + val unpacker = MessagePack.newDefaultUnpacker(conn_ch) |
| 233 | + val s = unpacker.unpackString |
| 234 | + unpacker.close |
| 235 | + s |
| 236 | + } |
| 237 | + }) |
| 238 | + |
| 239 | + future.get(5, TimeUnit.SECONDS) shouldBe "0123456789" |
| 240 | + } |
| 241 | + finally { |
| 242 | + executorService.shutdown |
| 243 | + if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) { |
| 244 | + executorService.shutdownNow |
| 245 | + } |
| 246 | + } |
| 247 | + } |
204 | 248 | } |
205 | 249 | } |
0 commit comments