4444import java .util .concurrent .Executors ;
4545import java .util .concurrent .ScheduledExecutorService ;
4646import java .util .concurrent .TimeUnit ;
47+ import java .util .concurrent .atomic .AtomicBoolean ;
4748import java .util .concurrent .atomic .AtomicInteger ;
4849
4950import static com .mongodb .assertions .Assertions .isTrue ;
@@ -372,47 +373,48 @@ private ConnectionId getId(final InternalConnection internalConnection) {
372373 }
373374
374375 private class PooledConnection implements InternalConnection {
375- private volatile UsageTrackingInternalConnection wrapped ;
376+ private final UsageTrackingInternalConnection wrapped ;
377+ private final AtomicBoolean isClosed = new AtomicBoolean ();
376378
377379 public PooledConnection (final UsageTrackingInternalConnection wrapped ) {
378380 this .wrapped = notNull ("wrapped" , wrapped );
379381 }
380382
381383 @ Override
382384 public void open () {
383- isTrue ("open" , wrapped != null );
385+ isTrue ("open" , ! isClosed . get () );
384386 wrapped .open ();
385387 }
386388
387389 @ Override
388390 public void openAsync (final SingleResultCallback <Void > callback ) {
389- isTrue ("open" , wrapped != null );
391+ isTrue ("open" , ! isClosed . get () );
390392 wrapped .openAsync (callback );
391393 }
392394
393395 @ Override
394396 public void close () {
395- if (wrapped != null ) {
396- if (!closed ) {
397+ // All but the first call is a no-op
398+ if (!isClosed .getAndSet (true )) {
399+ if (!DefaultConnectionPool .this .closed ) {
397400 connectionPoolListener .connectionCheckedIn (new ConnectionCheckedInEvent (getId (wrapped )));
398401 if (LOGGER .isTraceEnabled ()) {
399402 LOGGER .trace (format ("Checked in connection [%s] to server %s" , getId (wrapped ), serverId .getAddress ()));
400403 }
401404 }
402405 pool .release (wrapped , wrapped .isClosed () || shouldPrune (wrapped ));
403- wrapped = null ;
404406 }
405407 }
406408
407409 @ Override
408410 public boolean opened () {
409- isTrue ("open" , wrapped != null );
411+ isTrue ("open" , ! isClosed . get () );
410412 return wrapped .opened ();
411413 }
412414
413415 @ Override
414416 public boolean isClosed () {
415- return wrapped == null || wrapped .isClosed ();
417+ return isClosed . get () || wrapped .isClosed ();
416418 }
417419
418420 @ Override
@@ -422,7 +424,7 @@ public ByteBuf getBuffer(final int capacity) {
422424
423425 @ Override
424426 public void sendMessage (final List <ByteBuf > byteBuffers , final int lastRequestId ) {
425- isTrue ("open" , wrapped != null );
427+ isTrue ("open" , ! isClosed . get () );
426428 try {
427429 wrapped .sendMessage (byteBuffers , lastRequestId );
428430 } catch (MongoException e ) {
@@ -433,7 +435,7 @@ public void sendMessage(final List<ByteBuf> byteBuffers, final int lastRequestId
433435
434436 @ Override
435437 public ResponseBuffers receiveMessage (final int responseTo ) {
436- isTrue ("open" , wrapped != null );
438+ isTrue ("open" , ! isClosed . get () );
437439 try {
438440 return wrapped .receiveMessage (responseTo );
439441 } catch (MongoException e ) {
@@ -444,7 +446,7 @@ public ResponseBuffers receiveMessage(final int responseTo) {
444446
445447 @ Override
446448 public void sendMessageAsync (final List <ByteBuf > byteBuffers , final int lastRequestId , final SingleResultCallback <Void > callback ) {
447- isTrue ("open" , wrapped != null );
449+ isTrue ("open" , ! isClosed . get () );
448450 wrapped .sendMessageAsync (byteBuffers , lastRequestId , new SingleResultCallback <Void >() {
449451 @ Override
450452 public void onResult (final Void result , final Throwable t ) {
@@ -458,7 +460,7 @@ public void onResult(final Void result, final Throwable t) {
458460
459461 @ Override
460462 public void receiveMessageAsync (final int responseTo , final SingleResultCallback <ResponseBuffers > callback ) {
461- isTrue ("open" , wrapped != null );
463+ isTrue ("open" , ! isClosed . get () );
462464 wrapped .receiveMessageAsync (responseTo , new SingleResultCallback <ResponseBuffers >() {
463465 @ Override
464466 public void onResult (final ResponseBuffers result , final Throwable t ) {
@@ -472,7 +474,7 @@ public void onResult(final ResponseBuffers result, final Throwable t) {
472474
473475 @ Override
474476 public ConnectionDescription getDescription () {
475- isTrue ("open" , wrapped != null );
477+ isTrue ("open" , ! isClosed . get () );
476478 return wrapped .getDescription ();
477479 }
478480 }
0 commit comments