Skip to content

Commit 010f784

Browse files
committed
Relax connection termination policy in routing driver
Previously routing driver terminated all connections towards a particular address when one of active connections had a network error. Connections were also terminated when new routing table did not contain some address that was present in the previous routing table. Such behaviour might be problematic because it results in terminated queries. Network errors might have been temporary but always resulted in termination of active queries. This commit makes driver keep connections towards machine that had failure but remove it from the routing table. This is done to prevent subsequent queries from using it until rediscovery. After rediscovery address can either appear again in the routing procedure response or not. If it is returned then driver will re-add it to routing table and old pool with all connections will be reused. If address is missing from routing table and pool has no active connections then driver will close the pool.
1 parent b92eac3 commit 010f784

File tree

18 files changed

+652
-255
lines changed

18 files changed

+652
-255
lines changed

driver/src/main/java/org/neo4j/driver/internal/async/NettyConnection.java

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@
2020

2121
import io.netty.channel.Channel;
2222
import io.netty.channel.pool.ChannelPool;
23-
import io.netty.util.concurrent.Promise;
2423

2524
import java.util.Map;
25+
import java.util.concurrent.CompletableFuture;
2626
import java.util.concurrent.CompletionStage;
2727
import java.util.concurrent.atomic.AtomicBoolean;
2828

@@ -39,16 +39,14 @@
3939
import org.neo4j.driver.internal.util.ServerVersion;
4040
import org.neo4j.driver.v1.Value;
4141

42-
import static java.util.concurrent.CompletableFuture.completedFuture;
43-
import static org.neo4j.driver.internal.util.Futures.asCompletionStage;
44-
4542
public class NettyConnection implements Connection
4643
{
4744
private final Channel channel;
4845
private final InboundMessageDispatcher messageDispatcher;
4946
private final BoltServerAddress serverAddress;
5047
private final ServerVersion serverVersion;
5148
private final ChannelPool channelPool;
49+
private final CompletableFuture<Void> releaseFuture;
5250
private final Clock clock;
5351

5452
private final AtomicBoolean open = new AtomicBoolean( true );
@@ -61,6 +59,7 @@ public NettyConnection( Channel channel, ChannelPool channelPool, Clock clock )
6159
this.serverAddress = ChannelAttributes.serverAddress( channel );
6260
this.serverVersion = ChannelAttributes.serverVersion( channel );
6361
this.channelPool = channelPool;
62+
this.releaseFuture = new CompletableFuture<>();
6463
this.clock = clock;
6564
}
6665

@@ -111,14 +110,9 @@ public CompletionStage<Void> release()
111110
{
112111
if ( open.compareAndSet( true, false ) )
113112
{
114-
Promise<Void> releasePromise = channel.eventLoop().newPromise();
115-
reset( new ResetResponseHandler( channel, channelPool, messageDispatcher, clock, releasePromise ) );
116-
return asCompletionStage( releasePromise );
117-
}
118-
else
119-
{
120-
return completedFuture( null );
113+
reset( new ResetResponseHandler( channel, channelPool, messageDispatcher, clock, releaseFuture ) );
121114
}
115+
return releaseFuture;
122116
}
123117

124118
@Override

driver/src/main/java/org/neo4j/driver/internal/async/pool/ConnectionPoolImpl.java

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import io.netty.util.concurrent.Future;
2626

2727
import java.util.Map;
28+
import java.util.Set;
2829
import java.util.concurrent.CompletionException;
2930
import java.util.concurrent.CompletionStage;
3031
import java.util.concurrent.ConcurrentHashMap;
@@ -58,10 +59,16 @@ public class ConnectionPoolImpl implements ConnectionPool
5859

5960
public ConnectionPoolImpl( ChannelConnector connector, Bootstrap bootstrap, PoolSettings settings,
6061
Logging logging, Clock clock )
62+
{
63+
this( connector, bootstrap, new ActiveChannelTracker( logging ), settings, logging, clock );
64+
}
65+
66+
ConnectionPoolImpl( ChannelConnector connector, Bootstrap bootstrap, ActiveChannelTracker activeChannelTracker,
67+
PoolSettings settings, Logging logging, Clock clock )
6168
{
6269
this.connector = connector;
6370
this.bootstrap = bootstrap;
64-
this.activeChannelTracker = new ActiveChannelTracker( logging );
71+
this.activeChannelTracker = activeChannelTracker;
6572
this.channelHealthChecker = new NettyChannelHealthChecker( settings, clock, logging );
6673
this.settings = settings;
6774
this.clock = clock;
@@ -101,6 +108,30 @@ public void purge( BoltServerAddress address )
101108
}
102109
}
103110

111+
@Override
112+
public void retainAll( Set<BoltServerAddress> addressesToRetain )
113+
{
114+
for ( BoltServerAddress address : pools.keySet() )
115+
{
116+
if ( !addressesToRetain.contains( address ) )
117+
{
118+
int activeChannels = activeChannelTracker.activeChannelCount( address );
119+
if ( activeChannels == 0 )
120+
{
121+
// address is not present in updated routing table and has no active connections
122+
// it's now safe to terminate corresponding connection pool and forget about it
123+
124+
ChannelPool pool = pools.remove( address );
125+
if ( pool != null )
126+
{
127+
log.info( "Purging idle connections towards %s", address );
128+
pool.close();
129+
}
130+
}
131+
}
132+
}
133+
}
134+
104135
@Override
105136
public boolean hasAddress( BoltServerAddress address )
106137
{
@@ -157,7 +188,7 @@ private ChannelPool getOrCreatePool( BoltServerAddress address )
157188
return pool;
158189
}
159190

160-
private NettyChannelPool newPool( BoltServerAddress address )
191+
ChannelPool newPool( BoltServerAddress address )
161192
{
162193
return new NettyChannelPool( address, connector, bootstrap, activeChannelTracker, channelHealthChecker,
163194
settings.connectionAcquisitionTimeout(), settings.maxConnectionPoolSize() );

driver/src/main/java/org/neo4j/driver/internal/cluster/AddressSet.java

Lines changed: 2 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -39,54 +39,9 @@ public int size()
3939
return addresses.length;
4040
}
4141

42-
public synchronized void update( Set<BoltServerAddress> addresses, Set<BoltServerAddress> removed )
42+
public synchronized void update( Set<BoltServerAddress> addresses )
4343
{
44-
BoltServerAddress[] prev = this.addresses;
45-
if ( addresses.isEmpty() )
46-
{
47-
this.addresses = NONE;
48-
return;
49-
}
50-
if ( prev.length == 0 )
51-
{
52-
this.addresses = addresses.toArray( NONE );
53-
return;
54-
}
55-
BoltServerAddress[] copy = null;
56-
if ( addresses.size() != prev.length )
57-
{
58-
copy = new BoltServerAddress[addresses.size()];
59-
}
60-
int j = 0;
61-
for ( int i = 0; i < prev.length; i++ )
62-
{
63-
if ( addresses.remove( prev[i] ) )
64-
{
65-
if ( copy != null )
66-
{
67-
copy[j++] = prev[i];
68-
}
69-
}
70-
else
71-
{
72-
removed.add( prev[i] );
73-
if ( copy == null )
74-
{
75-
copy = new BoltServerAddress[prev.length];
76-
System.arraycopy( prev, 0, copy, 0, i );
77-
j = i;
78-
}
79-
}
80-
}
81-
if ( copy == null )
82-
{
83-
return;
84-
}
85-
for ( BoltServerAddress address : addresses )
86-
{
87-
copy[j++] = address;
88-
}
89-
this.addresses = copy;
44+
this.addresses = addresses.toArray( NONE );
9045
}
9146

9247
public synchronized void remove( BoltServerAddress address )

driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterRoutingTable.java

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.neo4j.driver.internal.cluster;
2121

22+
import java.util.Collections;
2223
import java.util.HashSet;
2324
import java.util.LinkedHashSet;
2425
import java.util.Set;
@@ -43,7 +44,7 @@ public class ClusterRoutingTable implements RoutingTable
4344
public ClusterRoutingTable( Clock clock, BoltServerAddress... routingAddresses )
4445
{
4546
this( clock );
46-
routers.update( new LinkedHashSet<>( asList( routingAddresses ) ), new HashSet<BoltServerAddress>() );
47+
routers.update( new LinkedHashSet<>( asList( routingAddresses ) ) );
4748
}
4849

4950
private ClusterRoutingTable( Clock clock )
@@ -66,14 +67,12 @@ public boolean isStaleFor( AccessMode mode )
6667
}
6768

6869
@Override
69-
public synchronized Set<BoltServerAddress> update( ClusterComposition cluster )
70+
public synchronized void update( ClusterComposition cluster )
7071
{
7172
expirationTimeout = cluster.expirationTimestamp();
72-
Set<BoltServerAddress> removed = new HashSet<>();
73-
readers.update( cluster.readers(), removed );
74-
writers.update( cluster.writers(), removed );
75-
routers.update( cluster.routers(), removed );
76-
return removed;
73+
readers.update( cluster.readers() );
74+
writers.update( cluster.writers() );
75+
routers.update( cluster.routers() );
7776
}
7877

7978
@Override
@@ -102,6 +101,16 @@ public AddressSet routers()
102101
return routers;
103102
}
104103

104+
@Override
105+
public Set<BoltServerAddress> servers()
106+
{
107+
Set<BoltServerAddress> servers = new HashSet<>();
108+
Collections.addAll( servers, readers.toArray() );
109+
Collections.addAll( servers, writers.toArray() );
110+
Collections.addAll( servers, routers.toArray() );
111+
return servers;
112+
}
113+
105114
@Override
106115
public void removeWriter( BoltServerAddress toRemove )
107116
{

driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingProcedureRunner.java

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -53,18 +53,9 @@ public CompletionStage<RoutingProcedureResponse> run( CompletionStage<Connection
5353
return connectionStage.thenCompose( connection ->
5454
{
5555
Statement procedure = procedureStatement( connection.serverVersion() );
56-
return runProcedure( connection, procedure ).handle( ( records, error ) ->
57-
{
58-
Throwable cause = Futures.completionErrorCause( error );
59-
if ( cause != null )
60-
{
61-
return handleError( procedure, cause );
62-
}
63-
else
64-
{
65-
return new RoutingProcedureResponse( procedure, records );
66-
}
67-
} );
56+
return runProcedure( connection, procedure )
57+
.thenCompose( records -> releaseConnection( connection, records ) )
58+
.handle( ( records, error ) -> processProcedureResponse( procedure, records, error ) );
6859
} );
6960
}
7061

@@ -87,6 +78,30 @@ private Statement procedureStatement( ServerVersion serverVersion )
8778
}
8879
}
8980

81+
private CompletionStage<List<Record>> releaseConnection( Connection connection, List<Record> records )
82+
{
83+
// It is not strictly required to release connection after routing procedure invocation because it'll
84+
// be released by the PULL_ALL response handler after result is fully fetched. Such release will happen
85+
// in background. However, releasing it early as part of whole chain makes it easier to reason about
86+
// rediscovery in stub server tests. Some of them assume connections to instances not present in new
87+
// routing table will be closed immediately.
88+
return connection.release().thenApply( ignore -> records );
89+
}
90+
91+
private RoutingProcedureResponse processProcedureResponse( Statement procedure, List<Record> records,
92+
Throwable error )
93+
{
94+
Throwable cause = Futures.completionErrorCause( error );
95+
if ( cause != null )
96+
{
97+
return handleError( procedure, cause );
98+
}
99+
else
100+
{
101+
return new RoutingProcedureResponse( procedure, records );
102+
}
103+
}
104+
90105
private RoutingProcedureResponse handleError( Statement procedure, Throwable error )
91106
{
92107
if ( error instanceof ClientException )

driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTable.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ public interface RoutingTable
2727
{
2828
boolean isStaleFor( AccessMode mode );
2929

30-
Set<BoltServerAddress> update( ClusterComposition cluster );
30+
void update( ClusterComposition cluster );
3131

3232
void forget( BoltServerAddress address );
3333

@@ -37,5 +37,7 @@ public interface RoutingTable
3737

3838
AddressSet routers();
3939

40+
Set<BoltServerAddress> servers();
41+
4042
void removeWriter( BoltServerAddress toRemove );
4143
}

driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancer.java

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
import io.netty.util.concurrent.EventExecutorGroup;
2222

23-
import java.util.Set;
2423
import java.util.concurrent.CompletableFuture;
2524
import java.util.concurrent.CompletionStage;
2625

@@ -125,10 +124,8 @@ public CompletionStage<Void> close()
125124

126125
private synchronized void forget( BoltServerAddress address )
127126
{
128-
// First remove from the load balancer, to prevent concurrent threads from making connections to them.
127+
// remove from the routing table, to prevent concurrent threads from making connections to this address
129128
routingTable.forget( address );
130-
// drop all current connections to the address
131-
connectionPool.purge( address );
132129
}
133130

134131
private synchronized CompletionStage<RoutingTable> freshRoutingTable( AccessMode mode )
@@ -171,18 +168,21 @@ else if ( routingTable.isStaleFor( mode ) )
171168

172169
private synchronized void freshClusterCompositionFetched( ClusterComposition composition )
173170
{
174-
Set<BoltServerAddress> removed = routingTable.update( composition );
175-
176-
for ( BoltServerAddress address : removed )
171+
try
177172
{
178-
connectionPool.purge( address );
179-
}
173+
routingTable.update( composition );
174+
connectionPool.retainAll( routingTable.servers() );
180175

181-
log.info( "Refreshed routing information. %s", routingTable );
176+
log.info( "Refreshed routing information. %s", routingTable );
182177

183-
CompletableFuture<RoutingTable> routingTableFuture = refreshRoutingTableFuture;
184-
refreshRoutingTableFuture = null;
185-
routingTableFuture.complete( routingTable );
178+
CompletableFuture<RoutingTable> routingTableFuture = refreshRoutingTableFuture;
179+
refreshRoutingTableFuture = null;
180+
routingTableFuture.complete( routingTable );
181+
}
182+
catch ( Throwable error )
183+
{
184+
clusterCompositionLookupFailed( error );
185+
}
186186
}
187187

188188
private synchronized void clusterCompositionLookupFailed( Throwable error )

driver/src/main/java/org/neo4j/driver/internal/handlers/ResetResponseHandler.java

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,10 @@
2020

2121
import io.netty.channel.Channel;
2222
import io.netty.channel.pool.ChannelPool;
23-
import io.netty.util.concurrent.Promise;
23+
import io.netty.util.concurrent.Future;
2424

2525
import java.util.Map;
26+
import java.util.concurrent.CompletableFuture;
2627

2728
import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher;
2829
import org.neo4j.driver.internal.spi.ResponseHandler;
@@ -37,16 +38,16 @@ public class ResetResponseHandler implements ResponseHandler
3738
private final ChannelPool pool;
3839
private final InboundMessageDispatcher messageDispatcher;
3940
private final Clock clock;
40-
private final Promise<Void> releasePromise;
41+
private final CompletableFuture<Void> releaseFuture;
4142

4243
public ResetResponseHandler( Channel channel, ChannelPool pool, InboundMessageDispatcher messageDispatcher,
43-
Clock clock, Promise<Void> releasePromise )
44+
Clock clock, CompletableFuture<Void> releaseFuture )
4445
{
4546
this.channel = channel;
4647
this.pool = pool;
4748
this.messageDispatcher = messageDispatcher;
4849
this.clock = clock;
49-
this.releasePromise = releasePromise;
50+
this.releaseFuture = releaseFuture;
5051
}
5152

5253
@Override
@@ -72,13 +73,7 @@ private void releaseChannel()
7273
messageDispatcher.unMuteAckFailure();
7374
setLastUsedTimestamp( channel, clock.millis() );
7475

75-
if ( releasePromise == null )
76-
{
77-
pool.release( channel );
78-
}
79-
else
80-
{
81-
pool.release( channel, releasePromise );
82-
}
76+
Future<Void> released = pool.release( channel );
77+
released.addListener( ignore -> releaseFuture.complete( null ) );
8378
}
8479
}

0 commit comments

Comments
 (0)