@@ -114,52 +114,10 @@ public PooledConnection acquireConnection( AccessMode mode )
114114 public CompletionStage <AsyncConnection > acquireAsyncConnection ( AccessMode mode )
115115 {
116116 return freshRoutingTable ( mode )
117- .thenCompose ( routingTable -> acquire ( mode , routingTable ) )
117+ .thenCompose ( routingTable -> acquireAsync ( mode , routingTable ) )
118118 .thenApply ( connection -> new RoutingAsyncConnection ( connection , mode , this ) );
119119 }
120120
121- private CompletionStage <AsyncConnection > acquire ( AccessMode mode , RoutingTable routingTable )
122- {
123- AddressSet addresses = addressSet ( mode , routingTable );
124- CompletableFuture <AsyncConnection > result = new CompletableFuture <>();
125- acquire ( mode , addresses , result );
126- return result ;
127- }
128-
129- private void acquire ( AccessMode mode , AddressSet addresses , CompletableFuture <AsyncConnection > result )
130- {
131- BoltServerAddress address = selectAddressAsync ( mode , addresses );
132-
133- if ( address == null )
134- {
135- result .completeExceptionally ( new SessionExpiredException (
136- "Failed to obtain connection towards " + mode + " server. " +
137- "Known routing table is: " + routingTable ) );
138- return ;
139- }
140-
141- asyncConnectionPool .acquire ( address ).whenComplete ( ( connection , error ) ->
142- {
143- if ( error != null )
144- {
145- if ( error instanceof ServiceUnavailableException )
146- {
147- log .error ( "Failed to obtain a connection towards address " + address , error );
148- forget ( address );
149- eventExecutorGroup .next ().execute ( () -> acquire ( mode , addresses , result ) );
150- }
151- else
152- {
153- result .completeExceptionally ( error );
154- }
155- }
156- else
157- {
158- result .complete ( connection );
159- }
160- } );
161- }
162-
163121 @ Override
164122 public void onConnectionFailure ( BoltServerAddress address )
165123 {
@@ -294,6 +252,48 @@ private synchronized void clusterCompositionLookupFailed( Throwable error )
294252 routingTableFuture .completeExceptionally ( error );
295253 }
296254
255+ private CompletionStage <AsyncConnection > acquireAsync ( AccessMode mode , RoutingTable routingTable )
256+ {
257+ AddressSet addresses = addressSet ( mode , routingTable );
258+ CompletableFuture <AsyncConnection > result = new CompletableFuture <>();
259+ acquireAsync ( mode , addresses , result );
260+ return result ;
261+ }
262+
263+ private void acquireAsync ( AccessMode mode , AddressSet addresses , CompletableFuture <AsyncConnection > result )
264+ {
265+ BoltServerAddress address = selectAddressAsync ( mode , addresses );
266+
267+ if ( address == null )
268+ {
269+ result .completeExceptionally ( new SessionExpiredException (
270+ "Failed to obtain connection towards " + mode + " server. " +
271+ "Known routing table is: " + routingTable ) );
272+ return ;
273+ }
274+
275+ asyncConnectionPool .acquire ( address ).whenComplete ( ( connection , error ) ->
276+ {
277+ if ( error != null )
278+ {
279+ if ( error instanceof ServiceUnavailableException )
280+ {
281+ log .error ( "Failed to obtain a connection towards address " + address , error );
282+ forget ( address );
283+ eventExecutorGroup .next ().execute ( () -> acquireAsync ( mode , addresses , result ) );
284+ }
285+ else
286+ {
287+ result .completeExceptionally ( error );
288+ }
289+ }
290+ else
291+ {
292+ result .complete ( connection );
293+ }
294+ } );
295+ }
296+
297297 private static AddressSet addressSet ( AccessMode mode , RoutingTable routingTable )
298298 {
299299 switch ( mode )
0 commit comments