Skip to content

Commit 6efeb84

Browse files
committed
Improve disposal of broken connections
Connection pool can discard broken connections during acquisition (when connection liveness check timeout is configured) and when connections are returned to the pool. In both cases connections should be disposed and removed from the set of active connections. This is especially important with least connected load balancing strategy which examines amount of active connections for each address. This commit makes sure broken connections are disposed through the connections queue to make sure active set is always updated.
1 parent e41cab2 commit 6efeb84

File tree

6 files changed

+181
-3
lines changed

6 files changed

+181
-3
lines changed

driver/src/main/java/org/neo4j/driver/internal/net/pooling/BlockingPooledConnectionQueue.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,12 @@ public PooledConnection acquire( Supplier<PooledConnection> supplier )
107107
return connection;
108108
}
109109

110+
void disposeBroken( PooledConnection connection )
111+
{
112+
acquiredConnections.remove( connection );
113+
disposeSafely( connection );
114+
}
115+
110116
public boolean isEmpty()
111117
{
112118
return queue.isEmpty();
@@ -163,7 +169,7 @@ private void disposeSafely( PooledConnection connection )
163169
}
164170
catch ( Throwable disposeError )
165171
{
166-
logger.warn( "Error disposing connection during termination", disposeError );
172+
logger.warn( "Error disposing connection", disposeError );
167173
}
168174
}
169175

driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledConnectionReleaseConsumer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public void accept( PooledConnection pooledConnection )
4747
}
4848
else
4949
{
50-
pooledConnection.dispose();
50+
connections.disposeBroken( pooledConnection );
5151
}
5252
}
5353
}

driver/src/main/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPool.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,10 +142,16 @@ private PooledConnection acquireConnection( BoltServerAddress address,
142142
{
143143
ConnectionSupplier connectionSupplier = new ConnectionSupplier( connectionQueue, address );
144144

145-
PooledConnection connection;
145+
PooledConnection connection = null;
146146
boolean connectionCreated;
147147
do
148148
{
149+
// dispose previous connection that can't be acquired
150+
if ( connection != null )
151+
{
152+
connectionQueue.disposeBroken( connection );
153+
}
154+
149155
connection = connectionQueue.acquire( connectionSupplier );
150156
connectionCreated = connectionSupplier.connectionCreated();
151157
}

driver/src/test/java/org/neo4j/driver/internal/net/pooling/BlockingPooledConnectionQueueTest.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -285,6 +285,21 @@ public void shouldReportActiveConnections()
285285
assertEquals( 0, queue.activeConnections() );
286286
}
287287

288+
@Test
289+
@SuppressWarnings( "unchecked" )
290+
public void shouldDisposeBrokenConnections()
291+
{
292+
BlockingPooledConnectionQueue queue = newConnectionQueue( 5 );
293+
294+
queue.offer( mock( PooledConnection.class ) );
295+
PooledConnection connection = queue.acquire( mock( Supplier.class ) );
296+
assertEquals( 1, queue.activeConnections() );
297+
298+
queue.disposeBroken( connection );
299+
assertEquals( 0, queue.activeConnections() );
300+
verify( connection ).dispose();
301+
}
302+
288303
private static BlockingPooledConnectionQueue newConnectionQueue( int capacity )
289304
{
290305
return newConnectionQueue( capacity, mock( Logging.class, RETURNS_MOCKS ) );
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
/*
2+
* Copyright (c) 2002-2017 "Neo Technology,"
3+
* Network Engine for Objects in Lund AB [http://neotechnology.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.internal.net.pooling;
20+
21+
import org.junit.Test;
22+
23+
import org.neo4j.driver.internal.spi.ConnectionPool;
24+
import org.neo4j.driver.internal.spi.PooledConnection;
25+
import org.neo4j.driver.internal.util.Supplier;
26+
27+
import static org.junit.Assert.assertEquals;
28+
import static org.mockito.Mockito.mock;
29+
import static org.mockito.Mockito.verify;
30+
import static org.mockito.Mockito.when;
31+
import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING;
32+
import static org.neo4j.driver.internal.net.BoltServerAddress.LOCAL_DEFAULT;
33+
34+
public class PooledConnectionReleaseConsumerTest
35+
{
36+
@Test
37+
public void shouldOfferReusableConnectionsBackToTheConnectionsQueue()
38+
{
39+
BlockingPooledConnectionQueue queue = newConnectionQueue();
40+
PooledConnection connection = acquireConnection( queue );
41+
42+
PooledConnectionValidator validator = newConnectionValidator( true );
43+
PooledConnectionReleaseConsumer releaseConsumer = new PooledConnectionReleaseConsumer( queue, validator );
44+
45+
releaseConsumer.accept( connection );
46+
47+
// connection should now be idle
48+
assertEquals( 0, queue.activeConnections() );
49+
assertEquals( 1, queue.size() );
50+
51+
verify( connection ).reset();
52+
verify( connection ).sync();
53+
}
54+
55+
@Test
56+
public void shouldAskConnectionsQueueToDisposeNotReusableConnections()
57+
{
58+
BlockingPooledConnectionQueue queue = newConnectionQueue();
59+
PooledConnection connection = acquireConnection( queue );
60+
61+
PooledConnectionValidator validator = newConnectionValidator( false );
62+
PooledConnectionReleaseConsumer releaseConsumer = new PooledConnectionReleaseConsumer( queue, validator );
63+
64+
releaseConsumer.accept( connection );
65+
66+
// connection should've been disposed
67+
assertEquals( 0, queue.activeConnections() );
68+
assertEquals( 0, queue.size() );
69+
70+
verify( connection ).dispose();
71+
}
72+
73+
private static BlockingPooledConnectionQueue newConnectionQueue()
74+
{
75+
return new BlockingPooledConnectionQueue( LOCAL_DEFAULT, 5, DEV_NULL_LOGGING );
76+
}
77+
78+
@SuppressWarnings( "unchecked" )
79+
private static PooledConnection acquireConnection( BlockingPooledConnectionQueue queue )
80+
{
81+
queue.offer( newConnectionMock() );
82+
PooledConnection connection = queue.acquire( mock( Supplier.class ) );
83+
assertEquals( 1, queue.activeConnections() );
84+
return connection;
85+
}
86+
87+
private static PooledConnectionValidator newConnectionValidator( boolean allowsConnections )
88+
{
89+
ConnectionPool pool = mock( ConnectionPool.class );
90+
when( pool.hasAddress( LOCAL_DEFAULT ) ).thenReturn( allowsConnections );
91+
return new PooledConnectionValidator( pool );
92+
}
93+
94+
private static PooledConnection newConnectionMock()
95+
{
96+
PooledConnection connection = mock( PooledConnection.class );
97+
when( connection.boltServerAddress() ).thenReturn( LOCAL_DEFAULT );
98+
return connection;
99+
}
100+
}

driver/src/test/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPoolTest.java

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@
2424
import org.mockito.stubbing.Answer;
2525

2626
import java.util.ArrayList;
27+
import java.util.Collections;
2728
import java.util.List;
29+
import java.util.Map;
2830
import java.util.Set;
2931
import java.util.concurrent.Callable;
3032
import java.util.concurrent.ConcurrentHashMap;
@@ -42,6 +44,7 @@
4244
import org.neo4j.driver.internal.util.Clock;
4345
import org.neo4j.driver.internal.util.FakeClock;
4446
import org.neo4j.driver.v1.Logging;
47+
import org.neo4j.driver.v1.Value;
4548

4649
import static java.util.Collections.newSetFromMap;
4750
import static org.hamcrest.Matchers.instanceOf;
@@ -57,6 +60,7 @@
5760
import static org.mockito.Matchers.any;
5861
import static org.mockito.Mockito.RETURNS_MOCKS;
5962
import static org.mockito.Mockito.doNothing;
63+
import static org.mockito.Mockito.doThrow;
6064
import static org.mockito.Mockito.inOrder;
6165
import static org.mockito.Mockito.mock;
6266
import static org.mockito.Mockito.never;
@@ -65,6 +69,7 @@
6569
import static org.mockito.Mockito.when;
6670
import static org.neo4j.driver.internal.net.BoltServerAddress.DEFAULT_PORT;
6771
import static org.neo4j.driver.internal.net.BoltServerAddress.LOCAL_DEFAULT;
72+
import static org.neo4j.driver.v1.Values.value;
6873

6974
public class SocketConnectionPoolTest
7075
{
@@ -536,6 +541,52 @@ public void reportActiveConnectionsWhenHasIdleConnections()
536541
assertEquals( 0, pool.activeConnections( ADDRESS_1 ) );
537542
}
538543

544+
@Test
545+
public void shouldForgetBrokenIdleConnection()
546+
{
547+
Connection connection1 = newConnectionMock( ADDRESS_1 );
548+
Connection connection2 = newConnectionMock( ADDRESS_1 );
549+
550+
doNothing().doThrow( new RuntimeException() ).when( connection1 ).reset();
551+
552+
int idleTimeBeforeConnectionTest = 42;
553+
FakeClock clock = new FakeClock();
554+
Connector connector = newMockConnector( connection1, connection2 );
555+
SocketConnectionPool pool = newPool( connector, clock, idleTimeBeforeConnectionTest );
556+
557+
// acquire and release one connection
558+
pool.acquire( ADDRESS_1 ).close();
559+
// make this connection seem idle for too long
560+
clock.progress( idleTimeBeforeConnectionTest + 42 );
561+
562+
PooledConnection acquiredConnection = pool.acquire( ADDRESS_1 );
563+
564+
Map<String,Value> auth = Collections.singletonMap( "Key", value( "Value" ) );
565+
acquiredConnection.init( "DummyClient", auth );
566+
verify( connection1, never() ).init( "DummyClient", auth );
567+
verify( connection2 ).init( "DummyClient", auth );
568+
569+
assertEquals( 1, pool.activeConnections( ADDRESS_1 ) );
570+
acquiredConnection.close();
571+
assertEquals( 0, pool.activeConnections( ADDRESS_1 ) );
572+
}
573+
574+
@Test
575+
public void shouldForgetIdleConnection()
576+
{
577+
Connection connection = newConnectionMock( ADDRESS_1 );
578+
doThrow( new RuntimeException() ).when( connection ).reset();
579+
580+
SocketConnectionPool pool = newPool( newMockConnector( connection ), new FakeClock(), 42 );
581+
PooledConnection pooledConnection = pool.acquire( ADDRESS_1 );
582+
583+
// release the connection, it should fail to reset and be disposed
584+
pooledConnection.close();
585+
586+
assertEquals( 0, pool.activeConnections( ADDRESS_1 ) );
587+
verify( connection ).close();
588+
}
589+
539590
private static Answer<Connection> createConnectionAnswer( final Set<Connection> createdConnections )
540591
{
541592
return new Answer<Connection>()

0 commit comments

Comments
 (0)