Skip to content

Commit db739e9

Browse files
committed
Remove ping on Session#close()
Previously ping query (`RETURN 1`) was executed on each `Session#close()`. This was intended to verify the underlying connection and decide if it should be returned to the pool or not. Running such ping is not quite necessary because we already execute `RESET` and `SYNC` before that. Combination of `RESET` and `SYNC` guarantees to send at least one message and thus validates the connection. Also query `RETURN 1` made database start transaction and possibly do planning, this work seem unnecessary. This commit removes ping and adds tests to verify that we actually send messages to socket during pooled connection validation.
1 parent e57e38d commit db739e9

15 files changed

+151
-86
lines changed

driver/src/main/java/org/neo4j/driver/internal/net/SocketConnection.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,8 @@ public class SocketConnection implements Connection
5050
{
5151
private final Queue<Message> pendingMessages = new LinkedList<>();
5252
private final SocketResponseHandler responseHandler;
53-
private AtomicBoolean isInterrupted = new AtomicBoolean( false );
54-
private AtomicBoolean isAckFailureMuted = new AtomicBoolean( false );
53+
private final AtomicBoolean isInterrupted = new AtomicBoolean( false );
54+
private final AtomicBoolean isAckFailureMuted = new AtomicBoolean( false );
5555
private InternalServerInfo serverInfo;
5656

5757
private final SocketClient socket;
@@ -66,10 +66,19 @@ public SocketConnection( BoltServerAddress address, SecurityPlan securityPlan, L
6666
this.socket.start();
6767
}
6868

69-
// for mocked socket testing
70-
SocketConnection( SocketClient socket, Logger logger )
69+
/**
70+
* Create new connection backed by the given socket.
71+
* <p>
72+
* <b>Note:</b> this constructor should be used <b>only</b> for testing.
73+
*
74+
* @param socket the socket to use for network interactions.
75+
* @param serverInfo the info about server this connection points to.
76+
* @param logger the logger.
77+
*/
78+
public SocketConnection( SocketClient socket, InternalServerInfo serverInfo, Logger logger )
7179
{
7280
this.socket = socket;
81+
this.serverInfo = serverInfo;
7382
this.logger = logger;
7483
this.responseHandler = createResponseHandler( logger );
7584
this.socket.start();

driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledConnectionValidator.java

Lines changed: 3 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -18,24 +18,16 @@
1818
*/
1919
package org.neo4j.driver.internal.net.pooling;
2020

21-
import java.util.HashMap;
22-
import java.util.Map;
23-
24-
import org.neo4j.driver.internal.spi.Collector;
2521
import org.neo4j.driver.internal.spi.ConnectionPool;
26-
import org.neo4j.driver.v1.Value;
2722
import org.neo4j.driver.v1.util.Function;
2823

2924
class PooledConnectionValidator implements Function<PooledConnection,Boolean>
3025
{
3126
private final ConnectionPool pool;
32-
private final PoolSettings poolSettings;
33-
private static final Map<String,Value> NO_PARAMETERS = new HashMap<>();
3427

35-
PooledConnectionValidator( ConnectionPool pool, PoolSettings poolSettings )
28+
PooledConnectionValidator( ConnectionPool pool )
3629
{
3730
this.pool = pool;
38-
this.poolSettings = poolSettings;
3931
}
4032

4133
@Override
@@ -45,9 +37,7 @@ public Boolean apply( PooledConnection pooledConnection )
4537
// and we should close the conn without bothering to reset the conn at all
4638
return pool.hasAddress( pooledConnection.boltServerAddress() ) &&
4739
!pooledConnection.hasUnrecoverableErrors() &&
48-
reset( pooledConnection ) &&
49-
(pooledConnection.idleTime() <= poolSettings.idleTimeBeforeConnectionTest() ||
50-
ping( pooledConnection ));
40+
reset( pooledConnection );
5141
}
5242

5343
/**
@@ -57,7 +47,7 @@ public Boolean apply( PooledConnection pooledConnection )
5747
* @param conn the PooledConnection
5848
* @return true if the connection is reset successfully without any error, otherwise false.
5949
*/
60-
private boolean reset( PooledConnection conn )
50+
private static boolean reset( PooledConnection conn )
6151
{
6252
try
6353
{
@@ -70,19 +60,4 @@ private boolean reset( PooledConnection conn )
7060
return false;
7161
}
7262
}
73-
74-
private boolean ping( PooledConnection conn )
75-
{
76-
try
77-
{
78-
conn.run( "RETURN 1 // JavaDriver poll to test connection", NO_PARAMETERS, Collector.NO_OP );
79-
conn.pullAll( Collector.NO_OP );
80-
conn.sync();
81-
return true;
82-
}
83-
catch ( Throwable e )
84-
{
85-
return false;
86-
}
87-
}
8863
}

driver/src/main/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPool.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import java.util.List;
2222
import java.util.Map;
2323
import java.util.concurrent.ConcurrentHashMap;
24-
import java.util.concurrent.atomic.AtomicBoolean;
2524

2625
import org.neo4j.driver.internal.ConnectionSettings;
2726
import org.neo4j.driver.internal.net.BoltServerAddress;
@@ -113,10 +112,11 @@ public Connection acquire( final BoltServerAddress address )
113112
@Override
114113
public PooledConnection get()
115114
{
116-
return new PooledConnection( connect( address ), new
117-
PooledConnectionReleaseConsumer( connections,
118-
new PooledConnectionValidator( SocketConnectionPool.this, poolSettings ) ), clock );
119-
115+
PooledConnectionValidator connectionValidator =
116+
new PooledConnectionValidator( SocketConnectionPool.this );
117+
PooledConnectionReleaseConsumer releaseConsumer =
118+
new PooledConnectionReleaseConsumer( connections, connectionValidator );
119+
return new PooledConnection( connect( address ), releaseConsumer, clock );
120120
}
121121
};
122122
PooledConnection conn = connections.acquire( supplier );

driver/src/test/java/org/neo4j/driver/internal/net/SocketConnectionTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828

2929
import org.neo4j.driver.internal.messaging.Message;
3030
import org.neo4j.driver.internal.messaging.SuccessMessage;
31+
import org.neo4j.driver.internal.summary.InternalServerInfo;
3132
import org.neo4j.driver.v1.Logger;
3233
import org.neo4j.driver.v1.Values;
3334
import org.neo4j.driver.v1.summary.ServerInfo;
@@ -48,7 +49,7 @@ public void shouldReceiveServerInfoAfterInit() throws Throwable
4849
{
4950
// Given
5051
SocketClient socket = mock( SocketClient.class );
51-
SocketConnection conn = new SocketConnection( socket, mock( Logger.class ) );
52+
SocketConnection conn = new SocketConnection( socket, mock( InternalServerInfo.class ), mock( Logger.class ) );
5253

5354
when( socket.address() ).thenReturn( BoltServerAddress.from( URI.create( "http://neo4j.com:9000" ) ) );
5455

driver/src/test/java/org/neo4j/driver/internal/net/pooling/ConnectionInvalidationTest.java

Lines changed: 16 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,13 @@
2323

2424
import java.io.IOException;
2525
import java.util.HashMap;
26-
import java.util.concurrent.atomic.AtomicBoolean;
2726

2827
import org.neo4j.driver.internal.net.BoltServerAddress;
2928
import org.neo4j.driver.internal.spi.Collector;
3029
import org.neo4j.driver.internal.spi.Connection;
3130
import org.neo4j.driver.internal.spi.ConnectionPool;
3231
import org.neo4j.driver.internal.util.Clock;
3332
import org.neo4j.driver.internal.util.Consumers;
34-
import org.neo4j.driver.v1.Config;
3533
import org.neo4j.driver.v1.Value;
3634
import org.neo4j.driver.v1.exceptions.ClientException;
3735
import org.neo4j.driver.v1.exceptions.Neo4jException;
@@ -51,72 +49,57 @@
5149
import static org.mockito.Mockito.never;
5250
import static org.mockito.Mockito.verify;
5351
import static org.mockito.Mockito.when;
52+
import static org.neo4j.driver.internal.net.BoltServerAddress.LOCAL_DEFAULT;
5453

5554
public class ConnectionInvalidationTest
5655
{
5756
private final Connection delegate = mock( Connection.class );
58-
Clock clock = mock( Clock.class );
57+
private final Clock clock = mock( Clock.class );
5958

6059
private final PooledConnection conn =
6160
new PooledConnection( delegate, Consumers.<PooledConnection>noOp(), Clock.SYSTEM );
6261

6362
@SuppressWarnings( "unchecked" )
6463
@Test
65-
public void shouldInvalidateConnectionThatIsOld() throws Throwable
64+
public void shouldNotInvalidateConnectionThatIsUnableToRun() throws Throwable
6665
{
6766
// Given a connection that's broken
6867
Mockito.doThrow( new ClientException( "That didn't work" ) )
6968
.when( delegate ).run( anyString(), anyMap(), any( Collector.class ) );
70-
PoolSettings poolSettings = PoolSettings.defaultSettings();
71-
when( clock.millis() ).thenReturn( 0L, poolSettings.idleTimeBeforeConnectionTest() + 1L );
7269
PooledConnection conn = new PooledConnection( delegate, Consumers.<PooledConnection>noOp(), clock );
70+
PooledConnectionValidator validator = new PooledConnectionValidator( pool( true ) );
7371

7472
// When/Then
7573
BlockingPooledConnectionQueue
7674
queue = mock( BlockingPooledConnectionQueue.class );
77-
PooledConnectionValidator validator =
78-
new PooledConnectionValidator( pool( true ), poolSettings );
79-
8075
PooledConnectionReleaseConsumer consumer =
81-
new PooledConnectionReleaseConsumer( queue, validator);
76+
new PooledConnectionReleaseConsumer( queue,validator );
8277
consumer.accept( conn );
8378

84-
verify( queue, never() ).offer( conn );
79+
verify( queue ).offer( conn );
8580
}
8681

87-
@SuppressWarnings( "unchecked" )
8882
@Test
89-
public void shouldNotInvalidateConnectionThatIsNotOld() throws Throwable
83+
public void shouldInvalidateConnectionWithUnknownAddress()
9084
{
91-
// Given a connection that's broken
92-
Mockito.doThrow( new ClientException( "That didn't work" ) )
93-
.when( delegate ).run( anyString(), anyMap(), any( Collector.class ) );
94-
Config config = Config.defaultConfig();
95-
PoolSettings poolSettings = PoolSettings.defaultSettings();
96-
when( clock.millis() ).thenReturn( 0L, poolSettings.idleTimeBeforeConnectionTest() - 1L );
97-
PooledConnection conn = new PooledConnection( delegate, Consumers.<PooledConnection>noOp(), clock );
98-
PooledConnectionValidator validator =
99-
new PooledConnectionValidator( pool( true ), poolSettings );
85+
when( delegate.boltServerAddress() ).thenReturn( LOCAL_DEFAULT );
10086

101-
// When/Then
102-
BlockingPooledConnectionQueue
103-
queue = mock( BlockingPooledConnectionQueue.class );
104-
PooledConnectionReleaseConsumer consumer =
105-
new PooledConnectionReleaseConsumer( queue,validator );
87+
BlockingPooledConnectionQueue queue = mock( BlockingPooledConnectionQueue.class );
88+
PooledConnectionValidator validator = new PooledConnectionValidator( pool( false ) );
89+
90+
PooledConnectionReleaseConsumer consumer = new PooledConnectionReleaseConsumer( queue, validator );
10691
consumer.accept( conn );
10792

108-
verify( queue ).offer( conn );
93+
verify( queue, never() ).offer( conn );
10994
}
11095

11196
@Test
11297
public void shouldInvalidConnectionIfFailedToReset() throws Throwable
11398
{
11499
// Given a connection that's broken
115100
Mockito.doThrow( new ClientException( "That didn't work" ) ).when( delegate ).reset();
116-
PoolSettings poolSettings = PoolSettings.defaultSettings();
117101
PooledConnection conn = new PooledConnection( delegate, Consumers.<PooledConnection>noOp(), clock );
118-
PooledConnectionValidator validator =
119-
new PooledConnectionValidator( pool( true ), poolSettings );
102+
PooledConnectionValidator validator = new PooledConnectionValidator( pool( true ) );
120103
// When/Then
121104
BlockingPooledConnectionQueue
122105
queue = mock( BlockingPooledConnectionQueue.class );
@@ -165,9 +148,7 @@ private void assertUnrecoverable( Neo4jException exception )
165148
{
166149
assertThat( e, equalTo( exception ) );
167150
}
168-
PoolSettings poolSettings = PoolSettings.defaultSettings();
169-
PooledConnectionValidator validator =
170-
new PooledConnectionValidator( pool( true ), poolSettings );
151+
PooledConnectionValidator validator = new PooledConnectionValidator( pool( true ) );
171152

172153
// Then
173154
assertTrue( conn.hasUnrecoverableErrors() );
@@ -198,9 +179,7 @@ private void assertRecoverable( Neo4jException exception )
198179

199180
// Then
200181
assertFalse( conn.hasUnrecoverableErrors() );
201-
PoolSettings poolSettings = PoolSettings.defaultSettings();
202-
PooledConnectionValidator validator =
203-
new PooledConnectionValidator( pool( true ), poolSettings );
182+
PooledConnectionValidator validator = new PooledConnectionValidator( pool( true ) );
204183
BlockingPooledConnectionQueue
205184
queue = mock( BlockingPooledConnectionQueue.class );
206185
PooledConnectionReleaseConsumer consumer =
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
/**
2+
* Copyright (c) 2002-2016 "Neo Technology,"
3+
* Network Engine for Objects in Lund AB [http://neotechnology.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.internal.net.pooling;
20+
21+
import org.junit.Test;
22+
import org.mockito.ArgumentCaptor;
23+
import org.mockito.InOrder;
24+
25+
import java.io.IOException;
26+
import java.util.Queue;
27+
28+
import org.neo4j.driver.internal.messaging.Message;
29+
import org.neo4j.driver.internal.net.BoltServerAddress;
30+
import org.neo4j.driver.internal.net.SocketClient;
31+
import org.neo4j.driver.internal.net.SocketConnection;
32+
import org.neo4j.driver.internal.spi.Connection;
33+
import org.neo4j.driver.internal.spi.ConnectionPool;
34+
import org.neo4j.driver.internal.summary.InternalServerInfo;
35+
import org.neo4j.driver.internal.util.Clock;
36+
import org.neo4j.driver.internal.util.Consumers;
37+
38+
import static org.junit.Assert.assertEquals;
39+
import static org.junit.Assert.assertTrue;
40+
import static org.mockito.Matchers.any;
41+
import static org.mockito.Mockito.inOrder;
42+
import static org.mockito.Mockito.mock;
43+
import static org.mockito.Mockito.verify;
44+
import static org.mockito.Mockito.when;
45+
import static org.neo4j.driver.internal.logging.DevNullLogger.DEV_NULL_LOGGER;
46+
import static org.neo4j.driver.internal.messaging.ResetMessage.RESET;
47+
import static org.neo4j.driver.internal.net.BoltServerAddress.LOCAL_DEFAULT;
48+
49+
public class PooledConnectionValidatorTest
50+
{
51+
@Test
52+
public void resetAndSyncValidConnection()
53+
{
54+
Connection connection = mock( Connection.class );
55+
PooledConnection pooledConnection = newPooledConnection( connection );
56+
57+
PooledConnectionValidator validator = newValidatorWithMockedPool();
58+
boolean connectionIsValid = validator.apply( pooledConnection );
59+
60+
assertTrue( connectionIsValid );
61+
62+
InOrder inOrder = inOrder( connection );
63+
inOrder.verify( connection ).reset();
64+
inOrder.verify( connection ).sync();
65+
}
66+
67+
@Test
68+
public void sendsSingleResetMessageForValidConnection() throws IOException
69+
{
70+
SocketClient socket = mock( SocketClient.class );
71+
InternalServerInfo serverInfo = new InternalServerInfo( LOCAL_DEFAULT, "v1" );
72+
Connection connection = new SocketConnection( socket, serverInfo, DEV_NULL_LOGGER );
73+
PooledConnection pooledConnection = newPooledConnection( connection );
74+
75+
PooledConnectionValidator validator = newValidatorWithMockedPool();
76+
boolean connectionIsValid = validator.apply( pooledConnection );
77+
78+
assertTrue( connectionIsValid );
79+
80+
ArgumentCaptor<Queue<Message>> captor = messagesCaptor();
81+
verify( socket ).send( captor.capture() );
82+
assertEquals( 1, captor.getAllValues().size() );
83+
Queue<Message> messages = captor.getValue();
84+
assertEquals( 1, messages.size() );
85+
assertEquals( RESET, messages.peek() );
86+
}
87+
88+
private static PooledConnection newPooledConnection( Connection connection )
89+
{
90+
return new PooledConnection( connection, Consumers.<PooledConnection>noOp(), Clock.SYSTEM );
91+
}
92+
93+
private static PooledConnectionValidator newValidatorWithMockedPool()
94+
{
95+
return new PooledConnectionValidator( connectionPoolMock() );
96+
}
97+
98+
private static ConnectionPool connectionPoolMock()
99+
{
100+
ConnectionPool pool = mock( ConnectionPool.class );
101+
when( pool.hasAddress( any( BoltServerAddress.class ) ) ).thenReturn( true );
102+
return pool;
103+
}
104+
105+
@SuppressWarnings( "unchecked" )
106+
private static ArgumentCaptor<Queue<Message>> messagesCaptor()
107+
{
108+
return (ArgumentCaptor) ArgumentCaptor.forClass( Queue.class );
109+
}
110+
}
Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
11
!: AUTO INIT
22
!: AUTO RESET
3-
!: AUTO RUN "RETURN 1 // JavaDriver poll to test connection" {}
43
!: AUTO PULL_ALL
54

65
C: RUN "CALL dbms.cluster.routing.getServers" {}
76
PULL_ALL
87
S: SUCCESS {"fields": ["ttl", "servers"]}
98
RECORD [9223372036854775807, [{"addresses": ["127.0.0.1:9007","127.0.0.1:9008"],"role": "WRITE"}, {"addresses": ["127.0.0.1:9005","127.0.0.1:9006"], "role": "READ"},{"addresses": ["127.0.0.1:9001","127.0.0.1:9002","127.0.0.1:9003"], "role": "ROUTE"}]]
10-
SUCCESS {}
9+
SUCCESS {}

driver/src/test/resources/dead_server.script

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
!: AUTO INIT
22
!: AUTO RESET
3-
!: AUTO RUN "RETURN 1 // JavaDriver poll to test connection" {}
43
!: AUTO PULL_ALL
54
!: AUTO RUN "BEGIN" {}
65

0 commit comments

Comments
 (0)