|
31 | 31 | import java.nio.channels.InterruptedByTimeoutException; |
32 | 32 | import java.util.Iterator; |
33 | 33 | import java.util.List; |
| 34 | +import java.util.concurrent.atomic.AtomicReference; |
34 | 35 |
|
35 | 36 | import static com.mongodb.assertions.Assertions.isTrue; |
36 | 37 | import static java.util.concurrent.TimeUnit.MILLISECONDS; |
@@ -75,21 +76,7 @@ public void openAsync(final AsyncCompletionHandler<Void> handler) { |
75 | 76 | channel.setOption(StandardSocketOptions.SO_SNDBUF, settings.getSendBufferSize()); |
76 | 77 | } |
77 | 78 |
|
78 | | - channel.connect(serverAddress.getSocketAddress(), null, new CompletionHandler<Void, Object>() { |
79 | | - @Override |
80 | | - public void completed(final Void result, final Object attachment) { |
81 | | - handler.completed(null); |
82 | | - } |
83 | | - |
84 | | - @Override |
85 | | - public void failed(final Throwable exc, final Object attachment) { |
86 | | - if (exc instanceof ConnectException) { |
87 | | - handler.failed(new MongoSocketOpenException("Exception opening socket", getAddress(), exc)); |
88 | | - } else { |
89 | | - handler.failed(exc); |
90 | | - } |
91 | | - } |
92 | | - }); |
| 79 | + channel.connect(serverAddress.getSocketAddress(), null, new OpenCompletionHandler(handler)); |
93 | 80 | } catch (IOException e) { |
94 | 81 | handler.failed(new MongoSocketOpenException("Exception opening socket", serverAddress, e)); |
95 | 82 | } catch (Throwable t) { |
@@ -188,51 +175,101 @@ public void failed(final Throwable t) { |
188 | 175 | private class AsyncWritableByteChannelAdapter implements AsyncWritableByteChannel { |
189 | 176 | @Override |
190 | 177 | public void write(final ByteBuffer src, final AsyncCompletionHandler<Void> handler) { |
191 | | - channel.write(src, null, new CompletionHandler<Integer, Object>() { |
192 | | - @Override |
193 | | - public void completed(final Integer result, final Object attachment) { |
194 | | - handler.completed(null); |
195 | | - } |
| 178 | + channel.write(src, null, new WriteCompletionHandler(handler)); |
| 179 | + } |
196 | 180 |
|
197 | | - @Override |
198 | | - public void failed(final Throwable exc, final Object attachment) { |
199 | | - handler.failed(exc); |
200 | | - } |
201 | | - }); |
| 181 | + private class WriteCompletionHandler extends BaseCompletionHandler<Void, Integer, Object> { |
| 182 | + |
| 183 | + public WriteCompletionHandler(final AsyncCompletionHandler<Void> handler) { |
| 184 | + super(handler); |
| 185 | + } |
| 186 | + |
| 187 | + @Override |
| 188 | + public void completed(final Integer result, final Object attachment) { |
| 189 | + AsyncCompletionHandler<Void> localHandler = getHandlerAndClear(); |
| 190 | + localHandler.completed(null); |
| 191 | + } |
| 192 | + |
| 193 | + @Override |
| 194 | + public void failed(final Throwable exc, final Object attachment) { |
| 195 | + AsyncCompletionHandler<Void> localHandler = getHandlerAndClear(); |
| 196 | + localHandler.failed(exc); |
| 197 | + } |
202 | 198 | } |
203 | 199 | } |
204 | 200 |
|
205 | | - private final class BasicCompletionHandler implements CompletionHandler<Integer, Void> { |
206 | | - private final ByteBuf dst; |
207 | | - private final AsyncCompletionHandler<ByteBuf> handler; |
| 201 | + private final class BasicCompletionHandler extends BaseCompletionHandler<ByteBuf, Integer, Void> { |
| 202 | + private final AtomicReference<ByteBuf> byteBufReference; |
208 | 203 |
|
209 | 204 | private BasicCompletionHandler(final ByteBuf dst, final AsyncCompletionHandler<ByteBuf> handler) { |
210 | | - this.dst = dst; |
211 | | - this.handler = handler; |
| 205 | + super(handler); |
| 206 | + this.byteBufReference = new AtomicReference<ByteBuf>(dst); |
212 | 207 | } |
213 | 208 |
|
214 | 209 | @Override |
215 | 210 | public void completed(final Integer result, final Void attachment) { |
| 211 | + AsyncCompletionHandler<ByteBuf> localHandler = getHandlerAndClear(); |
| 212 | + ByteBuf localByteBuf = byteBufReference.getAndSet(null); |
216 | 213 | if (result == -1) { |
217 | | - dst.release(); |
218 | | - handler.failed(new MongoSocketReadException("Prematurely reached end of stream", serverAddress)); |
219 | | - } else if (!dst.hasRemaining()) { |
220 | | - dst.flip(); |
221 | | - handler.completed(dst); |
| 214 | + localByteBuf.release(); |
| 215 | + localHandler.failed(new MongoSocketReadException("Prematurely reached end of stream", serverAddress)); |
| 216 | + } else if (!localByteBuf.hasRemaining()) { |
| 217 | + localByteBuf.flip(); |
| 218 | + localHandler.completed(localByteBuf); |
222 | 219 | } else { |
223 | | - channel.read(dst.asNIO(), settings.getReadTimeout(MILLISECONDS), MILLISECONDS, null, |
224 | | - new BasicCompletionHandler(dst, handler)); |
| 220 | + channel.read(localByteBuf.asNIO(), settings.getReadTimeout(MILLISECONDS), MILLISECONDS, null, |
| 221 | + new BasicCompletionHandler(localByteBuf, localHandler)); |
225 | 222 | } |
226 | 223 | } |
227 | 224 |
|
228 | 225 | @Override |
229 | 226 | public void failed(final Throwable t, final Void attachment) { |
230 | | - dst.release(); |
| 227 | + AsyncCompletionHandler<ByteBuf> localHandler = getHandlerAndClear(); |
| 228 | + ByteBuf localByteBuf = byteBufReference.getAndSet(null); |
| 229 | + localByteBuf.release(); |
231 | 230 | if (t instanceof InterruptedByTimeoutException) { |
232 | | - handler.failed(new MongoSocketReadTimeoutException("Timeout while receiving message", serverAddress, t)); |
| 231 | + localHandler.failed(new MongoSocketReadTimeoutException("Timeout while receiving message", serverAddress, t)); |
233 | 232 | } else { |
234 | | - handler.failed(t); |
| 233 | + localHandler.failed(t); |
235 | 234 | } |
236 | 235 | } |
237 | 236 | } |
| 237 | + |
| 238 | + private class OpenCompletionHandler extends BaseCompletionHandler<Void, Void, Object> { |
| 239 | + public OpenCompletionHandler(final AsyncCompletionHandler<Void> handler) { |
| 240 | + super(handler); |
| 241 | + } |
| 242 | + |
| 243 | + @Override |
| 244 | + public void completed(final Void result, final Object attachment) { |
| 245 | + AsyncCompletionHandler<Void> localHandler = getHandlerAndClear(); |
| 246 | + localHandler.completed(null); |
| 247 | + } |
| 248 | + |
| 249 | + @Override |
| 250 | + public void failed(final Throwable exc, final Object attachment) { |
| 251 | + AsyncCompletionHandler<Void> localHandler = getHandlerAndClear(); |
| 252 | + if (exc instanceof ConnectException) { |
| 253 | + localHandler.failed(new MongoSocketOpenException("Exception opening socket", getAddress(), exc)); |
| 254 | + } else { |
| 255 | + localHandler.failed(exc); |
| 256 | + } |
| 257 | + } |
| 258 | + } |
| 259 | + |
| 260 | + // Private base class for all CompletionHandler implementors that ensures the upstream handler is |
| 261 | + // set to null before it is used. This is to work around an observed issue with implementations of |
| 262 | + // AsynchronousSocketChannel that fail to clear references to handlers stored in instance fields of |
| 263 | + // the class. |
| 264 | + private abstract static class BaseCompletionHandler<T, V, A> implements CompletionHandler<V, A> { |
| 265 | + private final AtomicReference<AsyncCompletionHandler<T>> handlerReference; |
| 266 | + |
| 267 | + public BaseCompletionHandler(final AsyncCompletionHandler<T> handler) { |
| 268 | + this.handlerReference = new AtomicReference<AsyncCompletionHandler<T>>(handler); |
| 269 | + } |
| 270 | + |
| 271 | + protected AsyncCompletionHandler<T> getHandlerAndClear() { |
| 272 | + return handlerReference.getAndSet(null); |
| 273 | + } |
| 274 | + } |
238 | 275 | } |
0 commit comments