Skip to content

Commit b92eac3

Browse files
authored
Merge pull request #434 from lutovich/1.5-no-block-in-evt-loop
Prohibit blocking operations in IO threads
2 parents 1bfd061 + cc29ca7 commit b92eac3

26 files changed

+640
-221
lines changed

driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,15 +79,15 @@ public final Driver newInstance( URI uri, AuthToken authToken, RoutingSettings r
7979
{
8080
InternalDriver driver = createDriver( uri, address, connectionPool, config, newRoutingSettings,
8181
eventExecutorGroup, securityPlan, retryLogic );
82-
Futures.getBlocking( driver.verifyConnectivity() );
82+
Futures.blockingGet( driver.verifyConnectivity() );
8383
return driver;
8484
}
8585
catch ( Throwable driverError )
8686
{
8787
// we need to close the connection pool if driver creation threw exception
8888
try
8989
{
90-
Futures.getBlocking( connectionPool.close() );
90+
Futures.blockingGet( connectionPool.close() );
9191
}
9292
catch ( Throwable closeError )
9393
{

driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,9 @@
4848

4949
import static java.util.Collections.emptyMap;
5050
import static java.util.concurrent.CompletableFuture.completedFuture;
51+
import static org.neo4j.driver.internal.util.Futures.blockingGet;
5152
import static org.neo4j.driver.internal.util.Futures.completionErrorCause;
5253
import static org.neo4j.driver.internal.util.Futures.failedFuture;
53-
import static org.neo4j.driver.internal.util.Futures.getBlocking;
5454
import static org.neo4j.driver.v1.Values.value;
5555

5656
public class ExplicitTransaction implements Transaction
@@ -150,7 +150,7 @@ public void failure()
150150
@Override
151151
public void close()
152152
{
153-
getBlocking( closeAsync() );
153+
blockingGet( closeAsync() );
154154
}
155155

156156
CompletionStage<Void> closeAsync()
@@ -274,7 +274,7 @@ public CompletionStage<StatementResultCursor> runAsync( String statementTemplate
274274
@Override
275275
public StatementResult run( Statement statement )
276276
{
277-
StatementResultCursor cursor = getBlocking( run( statement, false ) );
277+
StatementResultCursor cursor = blockingGet( run( statement, false ) );
278278
return new InternalStatementResult( cursor );
279279
}
280280

driver/src/main/java/org/neo4j/driver/internal/InternalDriver.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
import org.neo4j.driver.v1.Session;
3030

3131
import static java.util.concurrent.CompletableFuture.completedFuture;
32-
import static org.neo4j.driver.internal.util.Futures.getBlocking;
32+
import static org.neo4j.driver.internal.util.Futures.blockingGet;
3333

3434
public class InternalDriver implements Driver
3535
{
@@ -104,7 +104,7 @@ private Session newSession( AccessMode mode, Bookmark bookmark )
104104
@Override
105105
public void close()
106106
{
107-
getBlocking( closeAsync() );
107+
blockingGet( closeAsync() );
108108
}
109109

110110
@Override

driver/src/main/java/org/neo4j/driver/internal/InternalStatementResult.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
import org.neo4j.driver.v1.summary.ResultSummary;
2929
import org.neo4j.driver.v1.util.Function;
3030

31-
import static org.neo4j.driver.internal.util.Futures.getBlocking;
31+
import static org.neo4j.driver.internal.util.Futures.blockingGet;
3232

3333
public class InternalStatementResult implements StatementResult
3434
{
@@ -45,7 +45,7 @@ public List<String> keys()
4545
{
4646
if ( keys == null )
4747
{
48-
getBlocking( cursor.peekAsync() );
48+
blockingGet( cursor.peekAsync() );
4949
keys = cursor.keys();
5050
}
5151
return keys;
@@ -54,13 +54,13 @@ public List<String> keys()
5454
@Override
5555
public boolean hasNext()
5656
{
57-
return getBlocking( cursor.peekAsync() ) != null;
57+
return blockingGet( cursor.peekAsync() ) != null;
5858
}
5959

6060
@Override
6161
public Record next()
6262
{
63-
Record record = getBlocking( cursor.nextAsync() );
63+
Record record = blockingGet( cursor.nextAsync() );
6464
if ( record == null )
6565
{
6666
throw new NoSuchRecordException( "No more records" );
@@ -71,13 +71,13 @@ public Record next()
7171
@Override
7272
public Record single()
7373
{
74-
return getBlocking( cursor.singleAsync() );
74+
return blockingGet( cursor.singleAsync() );
7575
}
7676

7777
@Override
7878
public Record peek()
7979
{
80-
Record record = getBlocking( cursor.peekAsync() );
80+
Record record = blockingGet( cursor.peekAsync() );
8181
if ( record == null )
8282
{
8383
throw new NoSuchRecordException( "Cannot peek past the last record" );
@@ -88,25 +88,25 @@ public Record peek()
8888
@Override
8989
public List<Record> list()
9090
{
91-
return getBlocking( cursor.listAsync() );
91+
return blockingGet( cursor.listAsync() );
9292
}
9393

9494
@Override
9595
public <T> List<T> list( Function<Record, T> mapFunction )
9696
{
97-
return getBlocking( cursor.listAsync( mapFunction ) );
97+
return blockingGet( cursor.listAsync( mapFunction ) );
9898
}
9999

100100
@Override
101101
public ResultSummary consume()
102102
{
103-
return getBlocking( cursor.consumeAsync() );
103+
return blockingGet( cursor.consumeAsync() );
104104
}
105105

106106
@Override
107107
public ResultSummary summary()
108108
{
109-
return getBlocking( cursor.summaryAsync() );
109+
return blockingGet( cursor.summaryAsync() );
110110
}
111111

112112
@Override

driver/src/main/java/org/neo4j/driver/internal/LeakLoggingNetworkSession.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ protected void finalize() throws Throwable
4646

4747
private void logLeakIfNeeded()
4848
{
49-
Boolean isOpen = Futures.getBlocking( currentConnectionIsOpen() );
49+
Boolean isOpen = Futures.blockingGet( currentConnectionIsOpen() );
5050
if ( isOpen )
5151
{
5252
logger.error( "Neo4j Session object leaked, please ensure that your application" +

driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,8 @@
4747
import org.neo4j.driver.v1.types.TypeSystem;
4848

4949
import static java.util.concurrent.CompletableFuture.completedFuture;
50+
import static org.neo4j.driver.internal.util.Futures.blockingGet;
5051
import static org.neo4j.driver.internal.util.Futures.failedFuture;
51-
import static org.neo4j.driver.internal.util.Futures.getBlocking;
5252
import static org.neo4j.driver.v1.Values.value;
5353

5454
public class NetworkSession implements Session
@@ -132,7 +132,7 @@ public CompletionStage<StatementResultCursor> runAsync( String statementText, Va
132132
@Override
133133
public StatementResult run( Statement statement )
134134
{
135-
StatementResultCursor cursor = getBlocking( runAsync( statement, false ) );
135+
StatementResultCursor cursor = blockingGet( runAsync( statement, false ) );
136136
return new InternalStatementResult( cursor );
137137
}
138138

@@ -152,7 +152,7 @@ public boolean isOpen()
152152
@Override
153153
public void close()
154154
{
155-
getBlocking( closeAsync() );
155+
blockingGet( closeAsync() );
156156
}
157157

158158
@Override
@@ -189,7 +189,7 @@ public CompletionStage<Void> closeAsync()
189189
@Override
190190
public Transaction beginTransaction()
191191
{
192-
return getBlocking( beginTransactionAsync( mode ) );
192+
return blockingGet( beginTransactionAsync( mode ) );
193193
}
194194

195195
@Deprecated
@@ -248,7 +248,7 @@ public String lastBookmark()
248248
@Override
249249
public void reset()
250250
{
251-
getBlocking( resetAsync() );
251+
blockingGet( resetAsync() );
252252
}
253253

254254
private CompletionStage<Void> resetAsync()
@@ -288,7 +288,7 @@ private <T> T transaction( AccessMode mode, TransactionWork<T> work )
288288
// event loop thread will bock and wait for itself to read some data
289289
return retryLogic.retry( () ->
290290
{
291-
try ( Transaction tx = getBlocking( beginTransactionAsync( mode ) ) )
291+
try ( Transaction tx = blockingGet( beginTransactionAsync( mode ) ) )
292292
{
293293
try
294294
{

driver/src/main/java/org/neo4j/driver/internal/async/BootstrapFactory.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,7 @@
2020

2121
import io.netty.bootstrap.Bootstrap;
2222
import io.netty.channel.ChannelOption;
23-
import io.netty.channel.nio.NioEventLoopGroup;
24-
import io.netty.channel.socket.nio.NioSocketChannel;
23+
import io.netty.channel.EventLoopGroup;
2524

2625
public final class BootstrapFactory
2726
{
@@ -31,19 +30,19 @@ private BootstrapFactory()
3130

3231
public static Bootstrap newBootstrap()
3332
{
34-
return newBootstrap( new NioEventLoopGroup() );
33+
return newBootstrap( EventLoopGroupFactory.newEventLoopGroup() );
3534
}
3635

3736
public static Bootstrap newBootstrap( int threadCount )
3837
{
39-
return newBootstrap( new NioEventLoopGroup( threadCount ) );
38+
return newBootstrap( EventLoopGroupFactory.newEventLoopGroup( threadCount ) );
4039
}
4140

42-
private static Bootstrap newBootstrap( NioEventLoopGroup eventLoopGroup )
41+
private static Bootstrap newBootstrap( EventLoopGroup eventLoopGroup )
4342
{
4443
Bootstrap bootstrap = new Bootstrap();
4544
bootstrap.group( eventLoopGroup );
46-
bootstrap.channel( NioSocketChannel.class );
45+
bootstrap.channel( EventLoopGroupFactory.channelClass() );
4746
bootstrap.option( ChannelOption.SO_KEEPALIVE, true );
4847
bootstrap.option( ChannelOption.SO_REUSEADDR, true );
4948
return bootstrap;
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
/*
2+
* Copyright (c) 2002-2017 "Neo Technology,"
3+
* Network Engine for Objects in Lund AB [http://neotechnology.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.internal.async;
20+
21+
import io.netty.bootstrap.Bootstrap;
22+
import io.netty.channel.Channel;
23+
import io.netty.channel.EventLoopGroup;
24+
import io.netty.channel.nio.NioEventLoopGroup;
25+
import io.netty.channel.socket.nio.NioSocketChannel;
26+
import io.netty.util.concurrent.DefaultThreadFactory;
27+
import io.netty.util.concurrent.FastThreadLocalThread;
28+
29+
import java.util.concurrent.Executor;
30+
import java.util.concurrent.Future;
31+
import java.util.concurrent.ThreadFactory;
32+
33+
import org.neo4j.driver.v1.Session;
34+
35+
/**
36+
* Manages creation of Netty {@link EventLoopGroup}s, which are basically {@link Executor}s that perform IO operations.
37+
*/
38+
public final class EventLoopGroupFactory
39+
{
40+
private static final String THREAD_NAME_PREFIX = "Neo4jDriverIO";
41+
private static final int THREAD_PRIORITY = Thread.MAX_PRIORITY;
42+
43+
private EventLoopGroupFactory()
44+
{
45+
}
46+
47+
/**
48+
* Get class of {@link Channel} for {@link Bootstrap#channel(Class)} method.
49+
*
50+
* @return class of the channel, which should be consistent with {@link EventLoopGroup}s returned by
51+
* {@link #newEventLoopGroup()} and {@link #newEventLoopGroup(int)}.
52+
*/
53+
public static Class<? extends Channel> channelClass()
54+
{
55+
return NioSocketChannel.class;
56+
}
57+
58+
/**
59+
* Create new {@link EventLoopGroup} with specified thread count. Returned group should by given to
60+
* {@link Bootstrap#group(EventLoopGroup)}.
61+
*
62+
* @param threadCount amount of IO threads for the new group.
63+
* @return new group consistent with channel class returned by {@link #channelClass()}.
64+
*/
65+
public static EventLoopGroup newEventLoopGroup( int threadCount )
66+
{
67+
return new DriverEventLoopGroup( threadCount );
68+
}
69+
70+
/**
71+
* Create new {@link EventLoopGroup} with default thread count. Returned group should by given to
72+
* {@link Bootstrap#group(EventLoopGroup)}.
73+
*
74+
* @return new group consistent with channel class returned by {@link #channelClass()}.
75+
*/
76+
public static EventLoopGroup newEventLoopGroup()
77+
{
78+
return new DriverEventLoopGroup();
79+
}
80+
81+
/**
82+
* Assert that current thread is not an event loop used for async IO operations. This check is needed because
83+
* blocking API methods like {@link Session#run(String)} are implemented on top of corresponding async API methods
84+
* like {@link Session#runAsync(String)} using basically {@link Future#get()} calls. Deadlocks might happen when IO
85+
* thread executes blocking API call and has to wait for itself to read from the network.
86+
*
87+
* @throws IllegalStateException when current thread is an event loop IO thread.
88+
*/
89+
public static void assertNotInEventLoopThread() throws IllegalStateException
90+
{
91+
if ( Thread.currentThread() instanceof DriverThread )
92+
{
93+
throw new IllegalStateException(
94+
"Blocking operation can't be executed in IO thread because it might result in a deadlock. " +
95+
"Please do not use blocking API when chaining futures returned by async API methods." );
96+
}
97+
}
98+
99+
/**
100+
* Same as {@link NioEventLoopGroup} but uses a different {@link ThreadFactory} that produces threads of
101+
* {@link DriverThread} class. Such threads can be recognized by {@link #assertNotInEventLoopThread()}.
102+
*/
103+
private static class DriverEventLoopGroup extends NioEventLoopGroup
104+
{
105+
DriverEventLoopGroup()
106+
{
107+
}
108+
109+
DriverEventLoopGroup( int nThreads )
110+
{
111+
super( nThreads );
112+
}
113+
114+
@Override
115+
protected ThreadFactory newDefaultThreadFactory()
116+
{
117+
return new DriverThreadFactory();
118+
}
119+
}
120+
121+
/**
122+
* Same as {@link DefaultThreadFactory} created by {@link NioEventLoopGroup} by default, except produces threads of
123+
* {@link DriverThread} class. Such threads can be recognized by {@link #assertNotInEventLoopThread()}.
124+
*/
125+
private static class DriverThreadFactory extends DefaultThreadFactory
126+
{
127+
DriverThreadFactory()
128+
{
129+
super( THREAD_NAME_PREFIX, THREAD_PRIORITY );
130+
}
131+
132+
@Override
133+
protected Thread newThread( Runnable r, String name )
134+
{
135+
return new DriverThread( threadGroup, r, name );
136+
}
137+
}
138+
139+
/**
140+
* Same as default thread created by {@link DefaultThreadFactory} except this dedicated class can be easily
141+
* recognized by {@link #assertNotInEventLoopThread()}.
142+
*/
143+
private static class DriverThread extends FastThreadLocalThread
144+
{
145+
DriverThread( ThreadGroup group, Runnable target, String name )
146+
{
147+
super( group, target, name );
148+
}
149+
}
150+
}

0 commit comments

Comments
 (0)