@@ -66,7 +66,7 @@ public void start() {
6666 */
6767 @ Override
6868 public void requestStart (){
69- _myPort .get (). requestStart ();
69+ _myPort .requestStart ();
7070 }
7171
7272 /**
@@ -78,7 +78,7 @@ public void requestStart(){
7878 */
7979 @ Override
8080 public void requestDone (){
81- _myPort .get (). requestDone ();
81+ _myPort .requestDone ();
8282 }
8383
8484 /**
@@ -87,7 +87,7 @@ public void requestDone(){
8787 @ Override
8888 public void requestEnsureConnection (){
8989 checkMaster ( false , true );
90- _myPort .get (). requestEnsureConnection ();
90+ _myPort .requestEnsureConnection ();
9191 }
9292
9393 void _checkClosed (){
@@ -133,8 +133,7 @@ public WriteResult say( DB db , OutMessage m , WriteConcern concern , ServerAddr
133133 _checkClosed ();
134134 checkMaster ( false , true );
135135
136- MyPort mp = _myPort .get ();
137- DBPort port = mp .get ( true , ReadPreference .primary (), hostNeeded );
136+ DBPort port = _myPort .get (true , ReadPreference .primary (), hostNeeded );
138137
139138 try {
140139 port .checkAuth ( db .getMongo () );
@@ -147,7 +146,7 @@ public WriteResult say( DB db , OutMessage m , WriteConcern concern , ServerAddr
147146 }
148147 }
149148 catch ( IOException ioe ){
150- mp .error ( port , ioe );
149+ _myPort .error (port , ioe );
151150 _error ( ioe , false );
152151
153152 if ( concern .raiseNetworkErrors () )
@@ -162,11 +161,11 @@ public WriteResult say( DB db , OutMessage m , WriteConcern concern , ServerAddr
162161 throw me ;
163162 }
164163 catch ( RuntimeException re ){
165- mp .error ( port , re );
164+ _myPort .error (port , re );
166165 throw re ;
167166 }
168167 finally {
169- mp .done ( port );
168+ _myPort .done (port );
170169 m .doneWithMessage ();
171170 }
172171 }
@@ -236,8 +235,7 @@ private Response innerCall(final DB db, final DBCollection coll, final OutMessag
236235 if (!secondaryOk || getReplicaSetStatus () == null )
237236 checkMaster ( false , !secondaryOk );
238237
239- final MyPort mp = _myPort .get ();
240- final DBPort port = mp .get ( false , readPref , hostNeeded );
238+ final DBPort port = _myPort .get (false , readPref , hostNeeded );
241239
242240 Response res = null ;
243241 boolean retry = false ;
@@ -248,7 +246,7 @@ private Response innerCall(final DB db, final DBCollection coll, final OutMessag
248246 throw new MongoException ( "ids don't match" );
249247 }
250248 catch ( IOException ioe ){
251- mp .error ( port , ioe );
249+ _myPort .error (port , ioe );
252250 retry = retries > 0 && !coll ._name .equals ( "$cmd" )
253251 && !(ioe instanceof SocketTimeoutException ) && _error ( ioe , secondaryOk );
254252 if ( !retry ){
@@ -257,10 +255,10 @@ private Response innerCall(final DB db, final DBCollection coll, final OutMessag
257255 }
258256 }
259257 catch ( RuntimeException re ){
260- mp .error ( port , re );
258+ _myPort .error (port , re );
261259 throw re ;
262260 } finally {
263- mp .done ( port );
261+ _myPort .done (port );
264262 }
265263
266264 if (retry )
@@ -372,27 +370,29 @@ class MyPort {
372370
373371 DBPort get ( boolean keep , ReadPreference readPref , ServerAddress hostNeeded ){
374372
375- if ( hostNeeded != null ){
376- if (_requestPort != null && _requestPort .serverAddress ().equals (hostNeeded )) {
377- return _requestPort ;
373+ DBPort requestPort = getPinnedRequestPort ();
374+
375+ if ( hostNeeded != null ) {
376+ if (requestPort != null && requestPort .serverAddress ().equals (hostNeeded )) {
377+ return requestPort ;
378378 }
379379
380380 // asked for a specific host
381381 return _portHolder .get ( hostNeeded ).get ();
382382 }
383383
384- if ( _requestPort != null ){
384+ if ( requestPort != null ){
385385 // we are within a request, and have a port, should stick to it
386- if ( _requestPort .getPool () == _masterPortPool || !keep ) {
386+ if ( requestPort .getPool () == _masterPortPool || !keep ) {
387387 // if keep is false, it's a read, so we use port even if master changed
388- return _requestPort ;
388+ return requestPort ;
389389 }
390390
391391 // it's write and master has changed
392392 // we fall back on new master and try to go on with request
393393 // this may not be best behavior if spec of request is to stick with same server
394- _requestPort .getPool ().done (_requestPort );
395- _requestPort = null ;
394+ requestPort .getPool ().done (requestPort );
395+ pinnedRequestStatusThreadLocal . get (). requestPort = null ;
396396 }
397397
398398 DBPort p ;
@@ -414,17 +414,19 @@ DBPort get( boolean keep , ReadPreference readPref, ServerAddress hostNeeded ){
414414 p = _portHolder .get (node .getServerAddress ()).get ();
415415 }
416416
417- if ( _inRequest ) {
418- // if within request, remember port to stick to same server
419- _requestPort = p ;
417+ // if within request, remember port to stick to same server
418+ if ( pinnedRequestStatusThreadLocal . get () != null ) {
419+ pinnedRequestStatusThreadLocal . get (). requestPort = p ;
420420 }
421421
422422 return p ;
423423 }
424424
425- void done ( DBPort p ){
425+ void done ( DBPort p ) {
426+ DBPort requestPort = getPinnedRequestPort ();
427+
426428 // keep request port
427- if ( p != _requestPort ) {
429+ if (p != requestPort ) {
428430 p .getPool ().done (p );
429431 }
430432 }
@@ -436,7 +438,7 @@ void done( DBPort p ){
436438 */
437439 void error ( DBPort p , Exception e ){
438440 p .close ();
439- _requestPort = null ;
441+ pinnedRequestStatusThreadLocal . remove () ;
440442
441443 // depending on type of error, may need to close other connections in pool
442444 boolean recoverable = p .getPool ().gotError (e );
@@ -449,28 +451,39 @@ void error( DBPort p , Exception e ){
449451 }
450452
451453 void requestEnsureConnection (){
452- if ( ! _inRequest )
454+ if ( pinnedRequestStatusThreadLocal . get () == null )
453455 return ;
454456
455- if ( _requestPort != null )
457+ if ( getPinnedRequestPort () != null )
456458 return ;
457459
458- _requestPort = _masterPortPool .get ();
460+ pinnedRequestStatusThreadLocal . get (). requestPort = _masterPortPool .get ();
459461 }
460462
461463 void requestStart (){
462- _inRequest = true ;
464+ pinnedRequestStatusThreadLocal . set ( new PinnedRequestStatus ()) ;
463465 }
464466
465467 void requestDone (){
466- if ( _requestPort != null )
467- _requestPort .getPool ().done ( _requestPort );
468- _requestPort = null ;
469- _inRequest = false ;
468+ DBPort requestPort = getPinnedRequestPort ();
469+ if ( requestPort != null )
470+ requestPort .getPool ().done ( requestPort );
471+ pinnedRequestStatusThreadLocal .remove ();
472+ }
473+
474+ PinnedRequestStatus getPinnedRequestStatus () {
475+ return pinnedRequestStatusThreadLocal .get ();
476+ }
477+
478+ DBPort getPinnedRequestPort () {
479+ return pinnedRequestStatusThreadLocal .get () != null ? pinnedRequestStatusThreadLocal .get ().requestPort : null ;
470480 }
471481
472- DBPort _requestPort ;
473- boolean _inRequest ;
482+ private final ThreadLocal <PinnedRequestStatus > pinnedRequestStatusThreadLocal = new ThreadLocal <PinnedRequestStatus >();
483+ }
484+
485+ static class PinnedRequestStatus {
486+ DBPort requestPort ;
474487 }
475488
476489 void checkMaster ( boolean force , boolean failIfNoMaster ){
@@ -566,10 +579,6 @@ public void close(){
566579 _connectionStatus = null ;
567580 } catch (final Throwable t ) { /* nada */ }
568581 }
569-
570- // below this will remove the myport for this thread only
571- // client using thread pool in web framework may need to call close() from all threads
572- _myPort .remove ();
573582 }
574583
575584 /**
@@ -598,15 +607,14 @@ public boolean isOpen(){
598607
599608 @ Override
600609 public CommandResult authenticate (MongoCredential credentials ) {
601- final MyPort mp = _myPort .get ();
602- final DBPort port = mp .get (false , ReadPreference .primaryPreferred (), null );
610+ final DBPort port = _myPort .get (false , ReadPreference .primaryPreferred (), null );
603611
604612 try {
605613 CommandResult result = port .authenticate (_mongo , credentials );
606614 _mongo .getAuthority ().getCredentialsStore ().add (credentials );
607615 return result ;
608616 } finally {
609- mp .done (port );
617+ _myPort .done (port );
610618 }
611619 }
612620
@@ -621,7 +629,7 @@ public int getMaxBsonObjectSize() {
621629
622630 // expose for unit testing
623631 MyPort getMyPort () {
624- return _myPort . get () ;
632+ return _myPort ;
625633 }
626634
627635 private volatile DBPortPool _masterPortPool ;
@@ -634,10 +642,5 @@ MyPort getMyPort() {
634642 private volatile int _maxBsonObjectSize ;
635643 private volatile Boolean _isMongosDirectConnection ;
636644
637- private ThreadLocal <MyPort > _myPort = new ThreadLocal <MyPort >(){
638- protected MyPort initialValue (){
639- return new MyPort ();
640- }
641- };
642-
645+ MyPort _myPort = new MyPort ();
643646}
0 commit comments