5151import org .neo4j .driver .v1 .exceptions .SessionExpiredException ;
5252
5353import static java .util .concurrent .CompletableFuture .completedFuture ;
54- import static org .neo4j .driver .internal .async .Futures .failedFuture ;
5554
5655public class LoadBalancer implements ConnectionProvider , RoutingErrorHandler , AutoCloseable
5756{
@@ -62,6 +61,7 @@ public class LoadBalancer implements ConnectionProvider, RoutingErrorHandler, Au
6261 private final RoutingTable routingTable ;
6362 private final Rediscovery rediscovery ;
6463 private final LoadBalancingStrategy loadBalancingStrategy ;
64+ private final EventExecutorGroup eventExecutorGroup ;
6565 private final Logger log ;
6666
6767 private CompletableFuture <RoutingTable > refreshRoutingTableFuture ;
@@ -72,26 +72,28 @@ public LoadBalancer( BoltServerAddress initialRouter, RoutingSettings settings,
7272 {
7373 this ( connections , asyncConnectionPool , new ClusterRoutingTable ( clock , initialRouter ),
7474 createRediscovery ( initialRouter , settings , eventExecutorGroup , clock , logging ),
75- loadBalancerLogger ( logging ), loadBalancingStrategy );
75+ loadBalancerLogger ( logging ), loadBalancingStrategy , eventExecutorGroup );
7676 }
7777
7878 // Used only in testing
7979 public LoadBalancer ( ConnectionPool connections , AsyncConnectionPool asyncConnectionPool ,
80- RoutingTable routingTable , Rediscovery rediscovery , Logging logging )
80+ RoutingTable routingTable , Rediscovery rediscovery , EventExecutorGroup eventExecutorGroup , Logging logging )
8181 {
8282 this ( connections , asyncConnectionPool , routingTable , rediscovery , loadBalancerLogger ( logging ),
83- new LeastConnectedLoadBalancingStrategy ( connections , asyncConnectionPool , logging ) );
83+ new LeastConnectedLoadBalancingStrategy ( connections , asyncConnectionPool , logging ),
84+ eventExecutorGroup );
8485 }
8586
8687 private LoadBalancer ( ConnectionPool connections , AsyncConnectionPool asyncConnectionPool ,
8788 RoutingTable routingTable , Rediscovery rediscovery , Logger log ,
88- LoadBalancingStrategy loadBalancingStrategy )
89+ LoadBalancingStrategy loadBalancingStrategy , EventExecutorGroup eventExecutorGroup )
8990 {
9091 this .connections = connections ;
9192 this .asyncConnectionPool = asyncConnectionPool ;
9293 this .routingTable = routingTable ;
9394 this .rediscovery = rediscovery ;
9495 this .loadBalancingStrategy = loadBalancingStrategy ;
96+ this .eventExecutorGroup = eventExecutorGroup ;
9597 this .log = log ;
9698
9799 if ( connections != null )
@@ -111,22 +113,51 @@ public PooledConnection acquireConnection( AccessMode mode )
111113 @ Override
112114 public CompletionStage <AsyncConnection > acquireAsyncConnection ( AccessMode mode )
113115 {
114- return freshRoutingTable ( mode ).thenCompose ( routingTable ->
115- {
116- AddressSet addressSet = addressSet ( mode , routingTable );
117- BoltServerAddress address = selectAddressAsync ( mode , addressSet );
116+ return freshRoutingTable ( mode )
117+ .thenCompose ( routingTable -> acquire ( mode , routingTable ) )
118+ .thenApply ( connection -> new RoutingAsyncConnection ( connection , mode , this ) );
119+ }
120+
121+ private CompletionStage <AsyncConnection > acquire ( AccessMode mode , RoutingTable routingTable )
122+ {
123+ AddressSet addresses = addressSet ( mode , routingTable );
124+ CompletableFuture <AsyncConnection > result = new CompletableFuture <>();
125+ acquire ( mode , addresses , result );
126+ return result ;
127+ }
118128
119- // todo: loop like in sync version until we get a successful connection
129+ private void acquire ( AccessMode mode , AddressSet addresses , CompletableFuture <AsyncConnection > result )
130+ {
131+ BoltServerAddress address = selectAddressAsync ( mode , addresses );
132+
133+ if ( address == null )
134+ {
135+ result .completeExceptionally ( new SessionExpiredException (
136+ "Failed to obtain connection towards " + mode + " server. " +
137+ "Known routing table is: " + routingTable ) );
138+ return ;
139+ }
120140
121- if ( address == null )
141+ asyncConnectionPool .acquire ( address ).whenComplete ( ( connection , error ) ->
142+ {
143+ if ( error != null )
122144 {
123- return failedFuture ( new SessionExpiredException (
124- "Failed to obtain connection towards " + mode + " server. " +
125- "Known routing table is: " + routingTable ) );
145+ if ( error instanceof ServiceUnavailableException )
146+ {
147+ log .error ( "Failed to obtain a connection towards address " + address , error );
148+ forget ( address );
149+ eventExecutorGroup .next ().execute ( () -> acquire ( mode , addresses , result ) );
150+ }
151+ else
152+ {
153+ result .completeExceptionally ( error );
154+ }
126155 }
127-
128- return asyncConnectionPool .acquire ( address );
129- } ).thenApply ( connection -> new RoutingAsyncConnection ( connection , mode , this ) );
156+ else
157+ {
158+ result .complete ( connection );
159+ }
160+ } );
130161 }
131162
132163 @ Override
@@ -172,7 +203,10 @@ private synchronized void forget( BoltServerAddress address )
172203 // First remove from the load balancer, to prevent concurrent threads from making connections to them.
173204 routingTable .forget ( address );
174205 // drop all current connections to the address
175- connections .purge ( address );
206+ if ( connections != null )
207+ {
208+ connections .purge ( address );
209+ }
176210 asyncConnectionPool .purge ( address );
177211 }
178212
0 commit comments