Skip to content

Commit 869bcb0

Browse files
merge default into bug26008
2 parents f19e850 + d231fc1 commit 869bcb0

File tree

10 files changed

+90
-15
lines changed

10 files changed

+90
-15
lines changed

src/com/rabbitmq/client/Connection.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,4 +233,6 @@ public interface Connection extends ShutdownNotifier { // rename to AMQPConnecti
233233
* Remove all {@link BlockedListener}s.
234234
*/
235235
void clearBlockedListeners();
236+
237+
ExceptionHandler getExceptionHandler();
236238
}

src/com/rabbitmq/client/ConnectionFactory.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333

3434
import com.rabbitmq.client.impl.AMQConnection;
3535
import com.rabbitmq.client.impl.ConnectionParams;
36+
import com.rabbitmq.client.impl.DefaultExceptionHandler;
3637
import com.rabbitmq.client.impl.FrameHandler;
3738
import com.rabbitmq.client.impl.FrameHandlerFactory;
3839
import com.rabbitmq.client.impl.recovery.AutorecoveringConnection;
@@ -90,6 +91,7 @@ public class ConnectionFactory implements Cloneable {
9091
private SaslConfig saslConfig = DefaultSaslConfig.PLAIN;
9192
private ExecutorService sharedExecutor;
9293
private SocketConfigurator socketConf = new DefaultSocketConfigurator();
94+
private ExceptionHandler exceptionHandler = null;
9395

9496
private boolean automaticRecovery = false;
9597
private boolean topologyRecovery = true;
@@ -426,6 +428,25 @@ public void setSharedExecutor(ExecutorService executor) {
426428
this.sharedExecutor = executor;
427429
}
428430

431+
/**
432+
* Get the exception handler.
433+
*
434+
* @see ExceptionHandler
435+
*/
436+
public ExceptionHandler getExceptionHandler() {
437+
return exceptionHandler;
438+
}
439+
440+
/**
441+
* Set the exception handler to use by default for newly created connections.
442+
*
443+
* @see ExceptionHandler
444+
*/
445+
446+
public void setExceptionHandler(ExceptionHandler exceptionHandler) {
447+
this.exceptionHandler = exceptionHandler;
448+
}
449+
429450
public boolean isSSL(){
430451
return getSocketFactory() instanceof SSLSocketFactory;
431452
}
@@ -559,7 +580,7 @@ public Connection newConnection(ExecutorService executor, Address[] addrs)
559580
public ConnectionParams params(ExecutorService executor) {
560581
return new ConnectionParams(username, password, executor, virtualHost, getClientProperties(),
561582
requestedFrameMax, requestedChannelMax, requestedHeartbeat, saslConfig,
562-
networkRecoveryInterval, topologyRecovery);
583+
networkRecoveryInterval, topologyRecovery, exceptionHandler);
563584
}
564585

565586
/**

src/com/rabbitmq/client/impl/ExceptionHandler.java renamed to src/com/rabbitmq/client/ExceptionHandler.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,7 @@
1414
// Copyright (c) 2007-2013 GoPivotal, Inc. All rights reserved.
1515
//
1616

17-
package com.rabbitmq.client.impl;
18-
19-
import com.rabbitmq.client.Channel;
20-
import com.rabbitmq.client.Connection;
21-
import com.rabbitmq.client.Consumer;
22-
import com.rabbitmq.client.TopologyRecoveryException;
17+
package com.rabbitmq.client;
2318

2419
/**
2520
* Interface to an exception-handling object.

src/com/rabbitmq/client/impl/AMQConnection.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import com.rabbitmq.client.AMQP;
3232
import com.rabbitmq.client.AuthenticationFailureException;
3333
import com.rabbitmq.client.BlockedListener;
34+
import com.rabbitmq.client.ExceptionHandler;
3435
import com.rabbitmq.client.Method;
3536
import com.rabbitmq.client.AlreadyClosedException;
3637
import com.rabbitmq.client.Channel;
@@ -110,7 +111,7 @@ public static final Map<String, Object> defaultClientProperties() {
110111
private volatile boolean _running = false;
111112

112113
/** Handler for (uncaught) exceptions that crop up in the {@link MainLoop}. */
113-
private final ExceptionHandler _exceptionHandler;
114+
private ExceptionHandler _exceptionHandler = new DefaultExceptionHandler();
114115

115116
/** Object used for blocking main application thread when doing all the necessary
116117
* connection shutdown operations
@@ -214,6 +215,7 @@ public AMQConnection(ConnectionParams params, FrameHandler frameHandler)
214215
this.saslConfig = params.getSaslConfig();
215216

216217
this._workService = new ConsumerWorkService(params.getExecutor());
218+
this._exceptionHandler = params.getExceptionHandler();
217219
this._channelManager = null;
218220

219221
this._heartbeatSender = new HeartbeatSender(frameHandler);

src/com/rabbitmq/client/impl/ConnectionParams.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.rabbitmq.client.impl;
22

3+
import com.rabbitmq.client.ExceptionHandler;
34
import com.rabbitmq.client.SaslConfig;
45

56
import java.util.Map;
@@ -32,12 +33,13 @@ public class ConnectionParams {
3233
* @param saslConfig sasl configuration hook
3334
* @param networkRecoveryInterval interval used when recovering from network failure
3435
* @param topologyRecovery should topology (queues, exchanges, bindings, consumers) recovery be performed?
36+
* @param exceptionHandler
3537
*/
3638
public ConnectionParams(String username, String password, ExecutorService executor,
3739
String virtualHost, Map<String, Object> clientProperties,
3840
int requestedFrameMax, int requestedChannelMax, int requestedHeartbeat,
3941
SaslConfig saslConfig, int networkRecoveryInterval,
40-
boolean topologyRecovery) {
42+
boolean topologyRecovery, ExceptionHandler exceptionHandler) {
4143
this.username = username;
4244
this.password = password;
4345
this.executor = executor;
@@ -49,9 +51,8 @@ public ConnectionParams(String username, String password, ExecutorService execut
4951
this.saslConfig = saslConfig;
5052
this.networkRecoveryInterval = networkRecoveryInterval;
5153
this.topologyRecovery = topologyRecovery;
52-
5354

54-
this.exceptionHandler = new DefaultExceptionHandler();
55+
this.exceptionHandler = exceptionHandler;
5556
}
5657

5758
public String getUsername() {
@@ -99,7 +100,7 @@ public void setExceptionHandler(ExceptionHandler exceptionHandler) {
99100
}
100101

101102
public ConnectionParams exceptionHandler(ExceptionHandler exceptionHandler) {
102-
this.exceptionHandler = exceptionHandler;
103+
setExceptionHandler(exceptionHandler);
103104
return this;
104105
}
105106

src/com/rabbitmq/client/impl/DefaultExceptionHandler.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,11 @@
2424
import com.rabbitmq.client.Channel;
2525
import com.rabbitmq.client.Connection;
2626
import com.rabbitmq.client.Consumer;
27+
import com.rabbitmq.client.ExceptionHandler;
2728
import com.rabbitmq.client.TopologyRecoveryException;
2829

2930
/**
30-
* Default implementation of {@link ExceptionHandler} used by {@link AMQConnection}.
31+
* Default implementation of {@link com.rabbitmq.client.ExceptionHandler} used by {@link AMQConnection}.
3132
*/
3233
public class DefaultExceptionHandler implements ExceptionHandler {
3334
public void handleUnexpectedConnectionDriverException(Connection conn, Throwable exception) {

src/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
import com.rabbitmq.client.ShutdownSignalException;
1212
import com.rabbitmq.client.TopologyRecoveryException;
1313
import com.rabbitmq.client.impl.ConnectionParams;
14-
import com.rabbitmq.client.impl.ExceptionHandler;
14+
import com.rabbitmq.client.ExceptionHandler;
1515
import com.rabbitmq.client.impl.FrameHandlerFactory;
1616
import com.rabbitmq.client.impl.NetworkConnection;
1717

test/src/com/rabbitmq/client/test/AMQConnectionTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
import com.rabbitmq.client.ConnectionFactory;
3636
import com.rabbitmq.client.Consumer;
3737
import com.rabbitmq.client.impl.AMQConnection;
38-
import com.rabbitmq.client.impl.ExceptionHandler;
38+
import com.rabbitmq.client.ExceptionHandler;
3939
import com.rabbitmq.client.impl.Frame;
4040
import com.rabbitmq.client.impl.FrameHandler;
4141

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package com.rabbitmq.client.test.functional;
2+
3+
import com.rabbitmq.client.AMQP;
4+
import com.rabbitmq.client.Channel;
5+
import com.rabbitmq.client.Connection;
6+
import com.rabbitmq.client.ConnectionFactory;
7+
import com.rabbitmq.client.Consumer;
8+
import com.rabbitmq.client.DefaultConsumer;
9+
import com.rabbitmq.client.Envelope;
10+
import com.rabbitmq.client.ExceptionHandler;
11+
import com.rabbitmq.client.impl.DefaultExceptionHandler;
12+
import junit.framework.TestCase;
13+
14+
import java.io.IOException;
15+
import java.util.concurrent.CountDownLatch;
16+
import java.util.concurrent.TimeUnit;
17+
18+
public class ExceptionHandling extends TestCase {
19+
private ConnectionFactory newConnectionFactory(ExceptionHandler eh) {
20+
ConnectionFactory cf = new ConnectionFactory();
21+
cf.setExceptionHandler(eh);
22+
return cf;
23+
}
24+
25+
public void testHandleConsumerException() throws IOException, InterruptedException {
26+
final CountDownLatch latch = new CountDownLatch(1);
27+
final DefaultExceptionHandler eh = new DefaultExceptionHandler() {
28+
@Override
29+
public void handleConsumerException(Channel channel, Throwable exception, Consumer consumer, String consumerTag, String methodName) {
30+
latch.countDown();
31+
}
32+
};
33+
ConnectionFactory cf = newConnectionFactory(eh);
34+
assertEquals(cf.getExceptionHandler(), eh);
35+
Connection conn = cf.newConnection();
36+
assertEquals(conn.getExceptionHandler(), eh);
37+
Channel ch = conn.createChannel();
38+
String q = ch.queueDeclare().getQueue();
39+
ch.basicConsume(q, new DefaultConsumer(ch) {
40+
@Override
41+
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
42+
throw new RuntimeException("oops");
43+
}
44+
});
45+
ch.basicPublish("", q, null, "".getBytes());
46+
wait(latch);
47+
}
48+
49+
private void wait(CountDownLatch latch) throws InterruptedException {
50+
latch.await(30, TimeUnit.MINUTES);
51+
}
52+
}

test/src/com/rabbitmq/client/test/functional/FunctionalTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,5 +76,6 @@ public static void add(TestSuite suite) {
7676
suite.addTestSuite(ConsumerPriorities.class);
7777
suite.addTestSuite(Policies.class);
7878
suite.addTestSuite(ConnectionRecovery.class);
79+
suite.addTestSuite(ExceptionHandling.class);
7980
}
8081
}

0 commit comments

Comments
 (0)