|
1 | 1 | package kotlinx.coroutines.experimental.nio |
2 | 2 |
|
| 3 | +import kotlinx.coroutines.experimental.CancellableContinuation |
| 4 | +import kotlinx.coroutines.experimental.CancellationException |
| 5 | +import kotlinx.coroutines.experimental.Job |
| 6 | +import kotlinx.coroutines.experimental.suspendCancellableCoroutine |
3 | 7 | import java.net.SocketAddress |
4 | 8 | import java.nio.ByteBuffer |
5 | | -import java.nio.channels.AsynchronousFileChannel |
6 | | -import java.nio.channels.AsynchronousServerSocketChannel |
7 | | -import java.nio.channels.AsynchronousSocketChannel |
8 | | -import java.nio.channels.CompletionHandler |
| 9 | +import java.nio.channels.* |
9 | 10 | import java.util.concurrent.TimeUnit |
10 | | -import kotlin.coroutines.Continuation |
11 | | -import kotlin.coroutines.suspendCoroutine |
12 | 11 |
|
| 12 | +/** |
| 13 | + * Performs [AsynchronousFileChannel.lock] without blocking a thread and resumes when asynchronous operation completes. |
| 14 | + * This suspending function is cancellable. |
| 15 | + * If the [Job] of the current coroutine is completed while this suspending function is suspended, this function |
| 16 | + * *closes the underlying channel* and immediately resumes with [CancellationException]. |
| 17 | + */ |
| 18 | +suspend fun AsynchronousFileChannel.aLock() = suspendCancellableCoroutine<FileLock> { cont -> |
| 19 | + lock(cont, asyncIOHandler()) |
| 20 | + closeOnCancel(cont) |
| 21 | +} |
| 22 | + |
| 23 | +/** |
| 24 | + * Performs [AsynchronousFileChannel.lock] without blocking a thread and resumes when asynchronous operation completes. |
| 25 | + * This suspending function is cancellable. |
| 26 | + * If the [Job] of the current coroutine is completed while this suspending function is suspended, this function |
| 27 | + * *closes the underlying channel* and immediately resumes with [CancellationException]. |
| 28 | + */ |
| 29 | +suspend fun AsynchronousFileChannel.aLock( |
| 30 | + position: Long, |
| 31 | + size: Long, |
| 32 | + shared: Boolean |
| 33 | +) = suspendCancellableCoroutine<FileLock> { cont -> |
| 34 | + lock(position, size, shared, cont, asyncIOHandler()) |
| 35 | + closeOnCancel(cont) |
| 36 | +} |
| 37 | + |
| 38 | +/** |
| 39 | + * Performs [AsynchronousFileChannel.read] without blocking a thread and resumes when asynchronous operation completes. |
| 40 | + * This suspending function is cancellable. |
| 41 | + * If the [Job] of the current coroutine is completed while this suspending function is suspended, this function |
| 42 | + * *closes the underlying channel* and immediately resumes with [CancellationException]. |
| 43 | + */ |
13 | 44 | suspend fun AsynchronousFileChannel.aRead( |
14 | 45 | buf: ByteBuffer, |
15 | 46 | position: Long |
16 | | -) = suspendCoroutine<Int> { c -> |
17 | | - this.read(buf, position, null, AsyncIOHandler(c)) |
| 47 | +) = suspendCancellableCoroutine<Int> { cont -> |
| 48 | + read(buf, position, cont, asyncIOHandler()) |
| 49 | + closeOnCancel(cont) |
18 | 50 | } |
19 | 51 |
|
| 52 | +/** |
| 53 | + * Performs [AsynchronousFileChannel.write] without blocking a thread and resumes when asynchronous operation completes. |
| 54 | + * This suspending function is cancellable. |
| 55 | + * If the [Job] of the current coroutine is completed while this suspending function is suspended, this function |
| 56 | + * *closes the underlying channel* and immediately resumes with [CancellationException]. |
| 57 | + */ |
20 | 58 | suspend fun AsynchronousFileChannel.aWrite( |
21 | 59 | buf: ByteBuffer, |
22 | 60 | position: Long |
23 | | -) = suspendCoroutine<Int> { c -> |
24 | | - this.write(buf, position, null, AsyncIOHandler(c)) |
| 61 | +) = suspendCancellableCoroutine<Int> { cont -> |
| 62 | + write(buf, position, cont, asyncIOHandler()) |
| 63 | + closeOnCancel(cont) |
25 | 64 | } |
26 | 65 |
|
27 | | -suspend fun AsynchronousServerSocketChannel.aAccept() = |
28 | | - suspendCoroutine<AsynchronousSocketChannel> { c -> |
29 | | - this.accept(null, AsyncIOHandler(c)) |
30 | | - } |
| 66 | +/** |
| 67 | + * Performs [AsynchronousServerSocketChannel.accept] without blocking a thread and resumes when asynchronous operation completes. |
| 68 | + * This suspending function is cancellable. |
| 69 | + * If the [Job] of the current coroutine is completed while this suspending function is suspended, this function |
| 70 | + * *closes the underlying channel* and immediately resumes with [CancellationException]. |
| 71 | + */ |
| 72 | +suspend fun AsynchronousServerSocketChannel.aAccept() = suspendCancellableCoroutine<AsynchronousSocketChannel> { cont -> |
| 73 | + accept(cont, asyncIOHandler()) |
| 74 | + closeOnCancel(cont) |
| 75 | +} |
31 | 76 |
|
| 77 | +/** |
| 78 | + * Performs [AsynchronousServerSocketChannel.connect] without blocking a thread and resumes when asynchronous operation completes. |
| 79 | + * This suspending function is cancellable. |
| 80 | + * If the [Job] of the current coroutine is completed while this suspending function is suspended, this function |
| 81 | + * *closes the underlying channel* and immediately resumes with [CancellationException]. |
| 82 | + */ |
32 | 83 | suspend fun AsynchronousSocketChannel.aConnect( |
33 | 84 | socketAddress: SocketAddress |
34 | | -) = suspendCoroutine<Unit> { c -> |
35 | | - this.connect(socketAddress, null, AsyncVoidIOHandler(c)) |
| 85 | +) = suspendCancellableCoroutine<Unit> { cont -> |
| 86 | + connect(socketAddress, cont, AsyncVoidIOHandler) |
| 87 | + closeOnCancel(cont) |
36 | 88 | } |
37 | 89 |
|
| 90 | +/** |
| 91 | + * Performs [AsynchronousServerSocketChannel.read] without blocking a thread and resumes when asynchronous operation completes. |
| 92 | + * This suspending function is cancellable. |
| 93 | + * If the [Job] of the current coroutine is completed while this suspending function is suspended, this function |
| 94 | + * *closes the underlying channel* and immediately resumes with [CancellationException]. |
| 95 | + */ |
38 | 96 | suspend fun AsynchronousSocketChannel.aRead( |
39 | 97 | buf: ByteBuffer, |
40 | 98 | timeout: Long = 0L, |
41 | 99 | timeUnit: TimeUnit = TimeUnit.MILLISECONDS |
42 | | -) = suspendCoroutine<Int> { c -> |
43 | | - this.read(buf, timeout, timeUnit, null, AsyncIOHandler(c)) |
| 100 | +) = suspendCancellableCoroutine<Int> { cont -> |
| 101 | + read(buf, timeout, timeUnit, cont, asyncIOHandler()) |
| 102 | + closeOnCancel(cont) |
44 | 103 | } |
45 | 104 |
|
| 105 | +/** |
| 106 | + * Performs [AsynchronousServerSocketChannel.write] without blocking a thread and resumes when asynchronous operation completes. |
| 107 | + * This suspending function is cancellable. |
| 108 | + * If the [Job] of the current coroutine is completed while this suspending function is suspended, this function |
| 109 | + * *closes the underlying channel* and immediately resumes with [CancellationException]. |
| 110 | + */ |
46 | 111 | suspend fun AsynchronousSocketChannel.aWrite( |
47 | 112 | buf: ByteBuffer, |
48 | 113 | timeout: Long = 0L, |
49 | 114 | timeUnit: TimeUnit = TimeUnit.MILLISECONDS |
50 | | -) = suspendCoroutine<Int> { c -> |
51 | | - this.write(buf, timeout, timeUnit, null, AsyncIOHandler(c)) |
| 115 | +) = suspendCancellableCoroutine<Int> { cont -> |
| 116 | + write(buf, timeout, timeUnit, cont, asyncIOHandler()) |
| 117 | + closeOnCancel(cont) |
52 | 118 | } |
53 | 119 |
|
54 | | -private class AsyncIOHandler<E>(val c: Continuation<E>) : CompletionHandler<E, Nothing?> { |
55 | | - override fun completed(result: E, attachment: Nothing?) { |
56 | | - c.resume(result) |
| 120 | +// ---------------- private details ---------------- |
| 121 | + |
| 122 | +private fun Channel.closeOnCancel(cont: CancellableContinuation<*>) { |
| 123 | + cont.onCompletion { |
| 124 | + if (cont.isCancelled) |
| 125 | + try { |
| 126 | + close() |
| 127 | + } catch (ex: Throwable) { |
| 128 | + // Specification says that it is Ok to call it any time, but reality is different, |
| 129 | + // so we have just to ignore exception |
| 130 | + } |
| 131 | + } |
| 132 | +} |
| 133 | + |
| 134 | +@Suppress("UNCHECKED_CAST") |
| 135 | +private fun <T> asyncIOHandler(): CompletionHandler<T, CancellableContinuation<T>> = |
| 136 | + AsyncIOHandlerAny as CompletionHandler<T, CancellableContinuation<T>> |
| 137 | + |
| 138 | +private object AsyncIOHandlerAny : CompletionHandler<Any, CancellableContinuation<Any>> { |
| 139 | + override fun completed(result: Any, cont: CancellableContinuation<Any>) { |
| 140 | + cont.resume(result) |
57 | 141 | } |
58 | 142 |
|
59 | | - override fun failed(exc: Throwable, attachment: Nothing?) { |
60 | | - c.resumeWithException(exc) |
| 143 | + override fun failed(ex: Throwable, cont: CancellableContinuation<Any>) { |
| 144 | + // just return if already cancelled and got an expected exception for that case |
| 145 | + if (ex is AsynchronousCloseException && cont.isCancelled) return |
| 146 | + cont.resumeWithException(ex) |
61 | 147 | } |
62 | 148 | } |
63 | 149 |
|
64 | | -private class AsyncVoidIOHandler(val c: Continuation<Unit>) : CompletionHandler<Void?, Nothing?> { |
65 | | - override fun completed(result: Void?, attachment: Nothing?) { |
66 | | - c.resume(Unit) |
| 150 | +private object AsyncVoidIOHandler : CompletionHandler<Void?, CancellableContinuation<Unit>> { |
| 151 | + override fun completed(result: Void?, cont: CancellableContinuation<Unit>) { |
| 152 | + cont.resume(Unit) |
67 | 153 | } |
68 | 154 |
|
69 | | - override fun failed(exc: Throwable, attachment: Nothing?) { |
70 | | - c.resumeWithException(exc) |
| 155 | + override fun failed(ex: Throwable, cont: CancellableContinuation<Unit>) { |
| 156 | + // just return if already cancelled and got an expected exception for that case |
| 157 | + if (ex is AsynchronousCloseException && cont.isCancelled) return |
| 158 | + cont.resumeWithException(ex) |
71 | 159 | } |
72 | 160 | } |
| 161 | + |
| 162 | + |
0 commit comments