Skip to content

Commit d844095

Browse files
committed
Fixed AccessMode confusion in session
`NetworkSession` acquires connections from the connection pool for the given access mode. Previously it tried to reuse existing connection when available. However, it did not pay attention to access mode when reusing connections. This could result in READ connection being used for WRITE operation and vice versa. This commit fixes the problem by making session buffer existing result, if any. Buffering means that in-use connection will be released back to the pool. It also allows session propagate unconsumed failures from previous query executions.
1 parent b5e7437 commit d844095

File tree

13 files changed

+342
-203
lines changed

13 files changed

+342
-203
lines changed

driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.neo4j.driver.internal.spi.Connection;
3636
import org.neo4j.driver.internal.spi.ResponseHandler;
3737
import org.neo4j.driver.internal.types.InternalTypeSystem;
38+
import org.neo4j.driver.internal.util.Futures;
3839
import org.neo4j.driver.v1.Record;
3940
import org.neo4j.driver.v1.Session;
4041
import org.neo4j.driver.v1.Statement;
@@ -115,7 +116,17 @@ public CompletionStage<ExplicitTransaction> beginAsync( Bookmark initialBookmark
115116
CompletableFuture<ExplicitTransaction> beginFuture = new CompletableFuture<>();
116117
connection.runAndFlush( BEGIN_QUERY, initialBookmark.asBeginTransactionParameters(),
117118
NoOpResponseHandler.INSTANCE, new BeginTxResponseHandler<>( beginFuture, this ) );
118-
return beginFuture;
119+
120+
return beginFuture.handle( ( tx, beginError ) ->
121+
{
122+
if ( beginError != null )
123+
{
124+
// release connection if begin failed, transaction can't be started
125+
connection.releaseNow();
126+
throw new CompletionException( Futures.completionErrorCause( beginError ) );
127+
}
128+
return tx;
129+
} );
119130
}
120131
}
121132

driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java

Lines changed: 66 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626

2727
import org.neo4j.driver.internal.async.InternalStatementResultCursor;
2828
import org.neo4j.driver.internal.async.QueryRunner;
29-
import org.neo4j.driver.internal.async.ResultCursorsHolder;
3029
import org.neo4j.driver.internal.logging.DelegatingLogger;
3130
import org.neo4j.driver.internal.retry.RetryLogic;
3231
import org.neo4j.driver.internal.spi.Connection;
@@ -60,12 +59,12 @@ public class NetworkSession implements Session
6059
private final ConnectionProvider connectionProvider;
6160
private final AccessMode mode;
6261
private final RetryLogic retryLogic;
63-
private final ResultCursorsHolder resultCursors;
6462
protected final Logger logger;
6563

6664
private volatile Bookmark bookmark = Bookmark.empty();
6765
private volatile CompletionStage<ExplicitTransaction> transactionStage = completedFuture( null );
6866
private volatile CompletionStage<Connection> connectionStage = completedFuture( null );
67+
private volatile CompletionStage<InternalStatementResultCursor> resultCursorStage = completedFuture( null );
6968

7069
private final AtomicBoolean open = new AtomicBoolean( true );
7170

@@ -75,7 +74,6 @@ public NetworkSession( ConnectionProvider connectionProvider, AccessMode mode, R
7574
this.connectionProvider = connectionProvider;
7675
this.mode = mode;
7776
this.retryLogic = retryLogic;
78-
this.resultCursors = new ResultCursorsHolder();
7977
this.logger = new DelegatingLogger( logging.getLog( LOG_NAME ), String.valueOf( hashCode() ) );
8078
}
8179

@@ -163,22 +161,28 @@ public CompletionStage<Void> closeAsync()
163161
{
164162
if ( open.compareAndSet( true, false ) )
165163
{
166-
return resultCursors.retrieveNotConsumedError()
167-
.thenCompose( error -> releaseResources().thenApply( ignore ->
168-
{
169-
Throwable queryError = Futures.completionErrorCause( error );
170-
if ( queryError != null )
171-
{
172-
// connection has been acquired and there is an unconsumed error in result cursor
173-
throw new CompletionException( queryError );
174-
}
175-
else
176-
{
177-
// either connection acquisition failed or
178-
// there are no unconsumed errors in the result cursor
179-
return null;
180-
}
181-
} ) );
164+
return resultCursorStage.thenCompose( cursor ->
165+
{
166+
if ( cursor == null )
167+
{
168+
return completedFuture( null );
169+
}
170+
return cursor.failureAsync();
171+
} ).thenCompose( error -> releaseResources().thenApply( ignore ->
172+
{
173+
Throwable queryError = Futures.completionErrorCause( error );
174+
if ( queryError != null )
175+
{
176+
// connection has been acquired and there is an unconsumed error in result cursor
177+
throw new CompletionException( queryError );
178+
}
179+
else
180+
{
181+
// either connection acquisition failed or
182+
// there are no unconsumed errors in the result cursor
183+
return null;
184+
}
185+
} ) );
182186
}
183187
return completedFuture( null );
184188
}
@@ -275,7 +279,7 @@ CompletionStage<Boolean> currentConnectionIsOpen()
275279
return connectionStage.handle( ( connection, error ) ->
276280
error == null && // no acquisition error
277281
connection != null && // some connection has actually been acquired
278-
connection.isInUse() ); // and it's still being used
282+
connection.isOpen() ); // and it's still open
279283
}
280284

281285
private <T> T transaction( AccessMode mode, TransactionWork<T> work )
@@ -412,7 +416,7 @@ private CompletionStage<InternalStatementResultCursor> runAsync( Statement state
412416
{
413417
ensureSessionIsOpen();
414418

415-
CompletionStage<InternalStatementResultCursor> cursorStage = ensureNoOpenTxBeforeRunningQuery()
419+
CompletionStage<InternalStatementResultCursor> newResultCursorStage = ensureNoOpenTxBeforeRunningQuery()
416420
.thenCompose( ignore -> acquireConnection( mode ) )
417421
.thenCompose( connection ->
418422
{
@@ -426,8 +430,9 @@ private CompletionStage<InternalStatementResultCursor> runAsync( Statement state
426430
}
427431
} );
428432

429-
resultCursors.add( cursorStage );
430-
return cursorStage;
433+
resultCursorStage = newResultCursorStage.exceptionally( error -> null );
434+
435+
return newResultCursorStage;
431436
}
432437

433438
private CompletionStage<ExplicitTransaction> beginTransactionAsync( AccessMode mode )
@@ -447,28 +452,46 @@ private CompletionStage<ExplicitTransaction> beginTransactionAsync( AccessMode m
447452

448453
private CompletionStage<Connection> acquireConnection( AccessMode mode )
449454
{
450-
// memorize in local so same instance is transformed and used in callbacks
451-
CompletionStage<Connection> currentAsyncConnectionStage = connectionStage;
455+
CompletionStage<Connection> currentConnectionStage = connectionStage;
452456

453-
connectionStage = currentAsyncConnectionStage
454-
.exceptionally( error -> null ) // handle previous acquisition failures
455-
.thenCompose( connection ->
456-
{
457-
if ( connection != null && connection.tryMarkInUse() )
458-
{
459-
// previous acquisition attempt was successful and connection has not been released yet
460-
// continue using same connection
461-
return currentAsyncConnectionStage;
462-
}
463-
else
464-
{
465-
// previous acquisition attempt failed or connection has been released
466-
// acquire new connection
467-
return connectionProvider.acquireConnection( mode );
468-
}
469-
} );
457+
CompletionStage<Connection> newConnectionStage = resultCursorStage.thenCompose( cursor ->
458+
{
459+
if ( cursor == null )
460+
{
461+
return completedFuture( null );
462+
}
463+
// make sure previous result is fully consumed and connection is released back to the pool
464+
return cursor.failureAsync();
465+
} ).thenCompose( error ->
466+
{
467+
if ( error == null )
468+
{
469+
// there is no unconsumed error, so one of the following is true:
470+
// 1) this is first time connection is acquired in this session
471+
// 2) previous result has been successful and is fully consumed
472+
// 3) previous result failed and error has been consumed
473+
474+
// return existing connection, which should've been released back to the pool by now
475+
return currentConnectionStage.exceptionally( ignore -> null );
476+
}
477+
else
478+
{
479+
// there exists unconsumed error, re-throw it
480+
throw new CompletionException( error );
481+
}
482+
} ).thenCompose( existingConnection ->
483+
{
484+
if ( existingConnection != null && existingConnection.isOpen() )
485+
{
486+
// there somehow is an existing open connection, this should not happen, just a precondition
487+
throw new IllegalStateException( "Existing open connection detected" );
488+
}
489+
return connectionProvider.acquireConnection( mode );
490+
} );
491+
492+
connectionStage = newConnectionStage.exceptionally( error -> null );
470493

471-
return connectionStage;
494+
return newConnectionStage;
472495
}
473496

474497
private CompletionStage<Void> releaseResources()

driver/src/main/java/org/neo4j/driver/internal/async/NettyConnection.java

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,9 @@ public class NettyConnection implements Connection
5050
private final ChannelPool channelPool;
5151
private final Clock clock;
5252

53+
private final AtomicBoolean open = new AtomicBoolean( true );
5354
private final AtomicBoolean autoReadEnabled = new AtomicBoolean( true );
5455

55-
private final NettyConnectionState state = new NettyConnectionState();
56-
5756
public NettyConnection( Channel channel, ChannelPool channelPool, Clock clock )
5857
{
5958
this.channel = channel;
@@ -63,15 +62,9 @@ public NettyConnection( Channel channel, ChannelPool channelPool, Clock clock )
6362
}
6463

6564
@Override
66-
public boolean isInUse()
67-
{
68-
return state.isInUse();
69-
}
70-
71-
@Override
72-
public boolean tryMarkInUse()
65+
public boolean isOpen()
7366
{
74-
return state.markInUse();
67+
return open.get();
7568
}
7669

7770
@Override
@@ -109,7 +102,7 @@ public void runAndFlush( String statement, Map<String,Value> parameters, Respons
109102
@Override
110103
public void releaseInBackground()
111104
{
112-
if ( state.release() )
105+
if ( open.compareAndSet( true, false ) )
113106
{
114107
reset( new ResetResponseHandler( channel, channelPool, messageDispatcher, clock ) );
115108
}
@@ -118,7 +111,7 @@ public void releaseInBackground()
118111
@Override
119112
public CompletionStage<Void> releaseNow()
120113
{
121-
if ( state.forceRelease() )
114+
if ( open.compareAndSet( true, false ) )
122115
{
123116
Promise<Void> releasePromise = channel.eventLoop().newPromise();
124117
reset( new ResetResponseHandler( channel, channelPool, messageDispatcher, clock, releasePromise ) );

driver/src/main/java/org/neo4j/driver/internal/async/RoutingConnection.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -41,12 +41,6 @@ public RoutingConnection( Connection delegate, AccessMode accessMode, RoutingErr
4141
this.errorHandler = errorHandler;
4242
}
4343

44-
@Override
45-
public boolean tryMarkInUse()
46-
{
47-
return delegate.tryMarkInUse();
48-
}
49-
5044
@Override
5145
public void enableAutoRead()
5246
{
@@ -82,9 +76,9 @@ public void releaseInBackground()
8276
}
8377

8478
@Override
85-
public boolean isInUse()
79+
public boolean isOpen()
8680
{
87-
return delegate.isInUse();
81+
return delegate.isOpen();
8882
}
8983

9084
@Override

driver/src/main/java/org/neo4j/driver/internal/spi/Connection.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,7 @@
2727

2828
public interface Connection
2929
{
30-
boolean isInUse();
31-
32-
boolean tryMarkInUse();
30+
boolean isOpen();
3331

3432
void enableAutoRead();
3533

driver/src/test/java/org/neo4j/driver/internal/ExplicitTransactionTest.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,20 @@
2121
import org.junit.Test;
2222
import org.mockito.InOrder;
2323

24+
import java.util.function.Consumer;
25+
2426
import org.neo4j.driver.internal.spi.Connection;
27+
import org.neo4j.driver.internal.spi.ResponseHandler;
2528
import org.neo4j.driver.v1.Transaction;
2629

30+
import static java.util.Collections.emptyMap;
2731
import static org.junit.Assert.assertEquals;
2832
import static org.junit.Assert.assertFalse;
2933
import static org.junit.Assert.assertTrue;
34+
import static org.junit.Assert.fail;
3035
import static org.mockito.Matchers.any;
3136
import static org.mockito.Matchers.eq;
37+
import static org.mockito.Mockito.doAnswer;
3238
import static org.mockito.Mockito.inOrder;
3339
import static org.mockito.Mockito.mock;
3440
import static org.mockito.Mockito.never;
@@ -220,6 +226,36 @@ public void shouldNotOverwriteBookmarkWithEmptyBookmark()
220226
assertEquals( "Cat", tx.bookmark().maxBookmarkAsString() );
221227
}
222228

229+
@Test
230+
public void shouldReleaseConnectionWhenBeginFails()
231+
{
232+
RuntimeException error = new RuntimeException( "Wrong bookmark!" );
233+
Connection connection = connectionWithBegin( handler -> handler.onFailure( error ) );
234+
ExplicitTransaction tx = new ExplicitTransaction( connection, mock( NetworkSession.class ) );
235+
236+
try
237+
{
238+
getBlocking( tx.beginAsync( Bookmark.from( "SomeBookmark" ) ) );
239+
fail( "Exception expected" );
240+
}
241+
catch ( RuntimeException e )
242+
{
243+
assertEquals( error, e );
244+
}
245+
246+
verify( connection ).releaseNow();
247+
}
248+
249+
@Test
250+
public void shouldNotReleaseConnectionWhenBeginSucceeds()
251+
{
252+
Connection connection = connectionWithBegin( handler -> handler.onSuccess( emptyMap() ) );
253+
ExplicitTransaction tx = new ExplicitTransaction( connection, mock( NetworkSession.class ) );
254+
getBlocking( tx.beginAsync( Bookmark.from( "SomeBookmark" ) ) );
255+
256+
verify( connection, never() ).releaseNow();
257+
}
258+
223259
private static ExplicitTransaction beginTx( Connection connection )
224260
{
225261
return beginTx( connection, Bookmark.empty() );
@@ -236,4 +272,18 @@ private static ExplicitTransaction beginTx( Connection connection, NetworkSessio
236272
ExplicitTransaction tx = new ExplicitTransaction( connection, session );
237273
return getBlocking( tx.beginAsync( initialBookmark ) );
238274
}
275+
276+
private static Connection connectionWithBegin( Consumer<ResponseHandler> beginBehaviour )
277+
{
278+
Connection connection = mock( Connection.class );
279+
280+
doAnswer( invocation ->
281+
{
282+
ResponseHandler beginHandler = invocation.getArgumentAt( 3, ResponseHandler.class );
283+
beginBehaviour.accept( beginHandler );
284+
return null;
285+
} ).when( connection ).runAndFlush( eq( "BEGIN" ), any(), any(), any() );
286+
287+
return connection;
288+
}
239289
}

driver/src/test/java/org/neo4j/driver/internal/LeakLoggingNetworkSessionTest.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -95,24 +95,24 @@ private static void finalize( Session session ) throws Exception
9595
finalizeMethod.invoke( session );
9696
}
9797

98-
private static LeakLoggingNetworkSession newSession( Logging logging, boolean inUseConnection )
98+
private static LeakLoggingNetworkSession newSession( Logging logging, boolean openConnection )
9999
{
100-
return new LeakLoggingNetworkSession( connectionProviderMock( inUseConnection ), READ,
100+
return new LeakLoggingNetworkSession( connectionProviderMock( openConnection ), READ,
101101
new FixedRetryLogic( 0 ), logging );
102102
}
103103

104-
private static ConnectionProvider connectionProviderMock( boolean inUseConnection )
104+
private static ConnectionProvider connectionProviderMock( boolean openConnection )
105105
{
106106
ConnectionProvider provider = mock( ConnectionProvider.class );
107-
Connection connection = connectionMock( inUseConnection );
107+
Connection connection = connectionMock( openConnection );
108108
when( provider.acquireConnection( any( AccessMode.class ) ) ).thenReturn( completedFuture( connection ) );
109109
return provider;
110110
}
111111

112-
private static Connection connectionMock( boolean inUse )
112+
private static Connection connectionMock( boolean open )
113113
{
114114
Connection connection = mock( Connection.class );
115-
when( connection.isInUse() ).thenReturn( inUse );
115+
when( connection.isOpen() ).thenReturn( open );
116116
return connection;
117117
}
118118
}

0 commit comments

Comments
 (0)