Skip to content

Commit 1e805d0

Browse files
committed
Improve error handing in channel pipeline
Introduced dedicated inbound handler that handles pipeline errors and channel inactive events. Previously this logic was only available in `InboundMessageHandler`. Now `OutboundMessageHandler` will also fire exception caught events and rely on `ChannelErrorHandler` to handles them. This makes `ChannelErrorHandler` the central place of error handling. Made all channel writes use void promise because write errors will be propagated to corresponding handlers by `ChannelErrorHandler`. Added a separate class `ChannelPipelineBuilder` that adds all needed handlers to the pipeline when channel is created. Addressed couple failing tests that used 3.1+ functionality and thus failed on 3.0.
1 parent c6251ea commit 1e805d0

19 files changed

+527
-217
lines changed
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
/*
2+
* Copyright (c) 2002-2017 "Neo Technology,"
3+
* Network Engine for Objects in Lund AB [http://neotechnology.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.internal.async;
20+
21+
import io.netty.channel.ChannelHandlerContext;
22+
import io.netty.channel.ChannelInboundHandlerAdapter;
23+
import io.netty.handler.codec.CodecException;
24+
25+
import java.io.IOException;
26+
27+
import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher;
28+
import org.neo4j.driver.v1.Logger;
29+
import org.neo4j.driver.v1.Logging;
30+
import org.neo4j.driver.v1.exceptions.ServiceUnavailableException;
31+
32+
import static java.util.Objects.requireNonNull;
33+
import static org.neo4j.driver.internal.async.ChannelAttributes.messageDispatcher;
34+
35+
public class ChannelErrorHandler extends ChannelInboundHandlerAdapter
36+
{
37+
private final Logger log;
38+
39+
private InboundMessageDispatcher messageDispatcher;
40+
private boolean failed;
41+
42+
public ChannelErrorHandler( Logging logging )
43+
{
44+
this.log = logging.getLog( getClass().getSimpleName() );
45+
}
46+
47+
@Override
48+
public void handlerAdded( ChannelHandlerContext ctx )
49+
{
50+
messageDispatcher = requireNonNull( messageDispatcher( ctx.channel() ) );
51+
}
52+
53+
@Override
54+
public void handlerRemoved( ChannelHandlerContext ctx )
55+
{
56+
messageDispatcher = null;
57+
failed = false;
58+
}
59+
60+
@Override
61+
public void channelInactive( ChannelHandlerContext ctx )
62+
{
63+
log.debug( "Channel inactive: %s", ctx.channel() );
64+
65+
if ( !failed )
66+
{
67+
// channel became inactive not because of a fatal exception that came from exceptionCaught
68+
// it is most likely inactive because actual network connection broke
69+
ServiceUnavailableException error = new ServiceUnavailableException(
70+
"Connection to the database terminated. " +
71+
"This can happen due to network instabilities, or due to restarts of the database" );
72+
73+
fail( ctx, error );
74+
}
75+
}
76+
77+
@Override
78+
public void exceptionCaught( ChannelHandlerContext ctx, Throwable error )
79+
{
80+
if ( failed )
81+
{
82+
log.warn( "Another fatal error in the pipeline of " + ctx.channel(), error );
83+
}
84+
else
85+
{
86+
failed = true;
87+
log.error( "Fatal error in the pipeline of " + ctx.channel(), error );
88+
fail( ctx, error );
89+
}
90+
}
91+
92+
private void fail( ChannelHandlerContext ctx, Throwable error )
93+
{
94+
Throwable cause = transformError( error );
95+
messageDispatcher.handleFatalError( cause );
96+
log.debug( "Closing channel: %s", ctx.channel() );
97+
ctx.close();
98+
}
99+
100+
private Throwable transformError( Throwable error )
101+
{
102+
if ( error instanceof CodecException )
103+
{
104+
// unwrap exception from message encoder/decoder
105+
error = error.getCause();
106+
}
107+
108+
if ( error instanceof IOException )
109+
{
110+
return new ServiceUnavailableException( "Connection to the database failed", error );
111+
}
112+
else
113+
{
114+
return error;
115+
}
116+
}
117+
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Copyright (c) 2002-2017 "Neo Technology,"
3+
* Network Engine for Objects in Lund AB [http://neotechnology.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.internal.async;
20+
21+
import io.netty.channel.Channel;
22+
import io.netty.channel.ChannelPipeline;
23+
24+
import org.neo4j.driver.internal.async.inbound.ChunkDecoder;
25+
import org.neo4j.driver.internal.async.inbound.InboundMessageHandler;
26+
import org.neo4j.driver.internal.async.inbound.MessageDecoder;
27+
import org.neo4j.driver.internal.async.outbound.OutboundMessageHandler;
28+
import org.neo4j.driver.internal.messaging.MessageFormat;
29+
import org.neo4j.driver.v1.Logging;
30+
31+
public final class ChannelPipelineBuilder
32+
{
33+
private ChannelPipelineBuilder()
34+
{
35+
}
36+
37+
public static void buildPipeline( Channel channel, MessageFormat messageFormat, Logging logging )
38+
{
39+
ChannelPipeline pipeline = channel.pipeline();
40+
41+
// inbound handlers
42+
pipeline.addLast( new ChunkDecoder() );
43+
pipeline.addLast( new MessageDecoder() );
44+
pipeline.addLast( new InboundMessageHandler( messageFormat, logging ) );
45+
46+
// outbound handlers
47+
pipeline.addLast( OutboundMessageHandler.NAME, new OutboundMessageHandler( messageFormat, logging ) );
48+
49+
// last one - error handler
50+
pipeline.addLast( new ChannelErrorHandler( logging ) );
51+
}
52+
}

driver/src/main/java/org/neo4j/driver/internal/async/HandshakeCompletedListener.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ public void operationComplete( ChannelFuture future )
5757
AsyncInitResponseHandler handler = new AsyncInitResponseHandler( connectionInitializedPromise );
5858

5959
messageDispatcher( channel ).queue( handler );
60-
channel.writeAndFlush( message );
60+
channel.writeAndFlush( message, channel.voidPromise() );
6161
}
6262
else
6363
{

driver/src/main/java/org/neo4j/driver/internal/async/HandshakeResponseHandler.java

Lines changed: 4 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -28,31 +28,27 @@
2828
import java.util.List;
2929
import javax.net.ssl.SSLHandshakeException;
3030

31-
import org.neo4j.driver.internal.async.inbound.ChunkDecoder;
32-
import org.neo4j.driver.internal.async.inbound.InboundMessageHandler;
33-
import org.neo4j.driver.internal.async.inbound.MessageDecoder;
34-
import org.neo4j.driver.internal.async.outbound.OutboundMessageHandler;
3531
import org.neo4j.driver.internal.messaging.MessageFormat;
3632
import org.neo4j.driver.internal.messaging.PackStreamMessageFormatV1;
3733
import org.neo4j.driver.v1.Logger;
3834
import org.neo4j.driver.v1.Logging;
3935
import org.neo4j.driver.v1.exceptions.ClientException;
4036
import org.neo4j.driver.v1.exceptions.SecurityException;
4137

42-
import static java.util.Objects.requireNonNull;
4338
import static org.neo4j.driver.internal.async.ProtocolUtil.HTTP;
4439
import static org.neo4j.driver.internal.async.ProtocolUtil.NO_PROTOCOL_VERSION;
4540
import static org.neo4j.driver.internal.async.ProtocolUtil.PROTOCOL_VERSION_1;
46-
import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING;
4741

4842
public class HandshakeResponseHandler extends ReplayingDecoder<Void>
4943
{
5044
private final ChannelPromise handshakeCompletedPromise;
45+
private final Logging logging;
5146
private final Logger log;
5247

5348
public HandshakeResponseHandler( ChannelPromise handshakeCompletedPromise, Logging logging )
5449
{
55-
this.handshakeCompletedPromise = requireNonNull( handshakeCompletedPromise );
50+
this.handshakeCompletedPromise = handshakeCompletedPromise;
51+
this.logging = logging;
5652
this.log = logging.getLog( getClass().getSimpleName() );
5753
}
5854

@@ -71,7 +67,6 @@ public void exceptionCaught( ChannelHandlerContext ctx, Throwable error )
7167
}
7268
}
7369

74-
// todo: do not use DEV_NULL_LOGGING
7570
@Override
7671
protected void decode( ChannelHandlerContext ctx, ByteBuf in, List<Object> out )
7772
{
@@ -86,15 +81,7 @@ protected void decode( ChannelHandlerContext ctx, ByteBuf in, List<Object> out )
8681
{
8782
case PROTOCOL_VERSION_1:
8883
MessageFormat format = new PackStreamMessageFormatV1();
89-
90-
// inbound handlers
91-
pipeline.addLast( new ChunkDecoder() );
92-
pipeline.addLast( new MessageDecoder() );
93-
pipeline.addLast( new InboundMessageHandler( format, DEV_NULL_LOGGING ) );
94-
95-
// outbound handlers
96-
pipeline.addLast( OutboundMessageHandler.NAME, new OutboundMessageHandler( format, DEV_NULL_LOGGING ) );
97-
84+
ChannelPipelineBuilder.buildPipeline( ctx.channel(), format, logging );
9885
handshakeCompletedPromise.setSuccess();
9986

10087
break;

driver/src/main/java/org/neo4j/driver/internal/async/NettyConnection.java

Lines changed: 6 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
package org.neo4j.driver.internal.async;
2020

2121
import io.netty.channel.Channel;
22-
import io.netty.channel.EventLoop;
2322
import io.netty.channel.pool.ChannelPool;
2423
import io.netty.util.concurrent.Promise;
2524

@@ -158,30 +157,12 @@ private void reset( ResponseHandler resetHandler )
158157
private void writeMessagesInEventLoop( Message message1, ResponseHandler handler1, Message message2,
159158
ResponseHandler handler2, boolean flush )
160159
{
161-
EventLoop eventLoop = channel.eventLoop();
162-
163-
if ( eventLoop.inEventLoop() )
164-
{
165-
writeMessages( message1, handler1, message2, handler2, flush );
166-
}
167-
else
168-
{
169-
eventLoop.execute( () -> writeMessages( message1, handler1, message2, handler2, flush ) );
170-
}
160+
channel.eventLoop().execute( () -> writeMessages( message1, handler1, message2, handler2, flush ) );
171161
}
172162

173163
private void writeAndFlushMessageInEventLoop( Message message, ResponseHandler handler )
174164
{
175-
EventLoop eventLoop = channel.eventLoop();
176-
177-
if ( eventLoop.inEventLoop() )
178-
{
179-
writeAndFlushMessage( message, handler );
180-
}
181-
else
182-
{
183-
eventLoop.execute( () -> writeAndFlushMessage( message, handler ) );
184-
}
165+
channel.eventLoop().execute( () -> writeAndFlushMessage( message, handler ) );
185166
}
186167

187168
private void writeMessages( Message message1, ResponseHandler handler1, Message message2, ResponseHandler handler2,
@@ -190,22 +171,22 @@ private void writeMessages( Message message1, ResponseHandler handler1, Message
190171
messageDispatcher.queue( handler1 );
191172
messageDispatcher.queue( handler2 );
192173

193-
channel.write( message1 );
174+
channel.write( message1, channel.voidPromise() );
194175

195176
if ( flush )
196177
{
197-
channel.writeAndFlush( message2 );
178+
channel.writeAndFlush( message2, channel.voidPromise() );
198179
}
199180
else
200181
{
201-
channel.write( message2 );
182+
channel.write( message2, channel.voidPromise() );
202183
}
203184
}
204185

205186
private void writeAndFlushMessage( Message message, ResponseHandler handler )
206187
{
207188
messageDispatcher.queue( handler );
208-
channel.writeAndFlush( message );
189+
channel.writeAndFlush( message, channel.voidPromise() );
209190
}
210191

211192
private void setAutoRead( boolean value )

driver/src/main/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcher.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ public void handleFailureMessage( String code, String message )
135135

136136
// queue ACK_FAILURE before notifying the next response handler
137137
queue( new AckFailureResponseHandler( this ) );
138-
channel.writeAndFlush( AckFailureMessage.ACK_FAILURE );
138+
channel.writeAndFlush( AckFailureMessage.ACK_FAILURE, channel.voidPromise() );
139139

140140
ResponseHandler handler = handlers.remove();
141141
handler.onFailure( currentError );

0 commit comments

Comments
 (0)