Skip to content

Commit d94e425

Browse files
Propagate exception handler to connections
1 parent b66ce0a commit d94e425

File tree

4 files changed

+57
-4
lines changed

4 files changed

+57
-4
lines changed

src/com/rabbitmq/client/Connection.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,4 +233,6 @@ public interface Connection extends ShutdownNotifier { // rename to AMQPConnecti
233233
* Remove all {@link BlockedListener}s.
234234
*/
235235
void clearBlockedListeners();
236+
237+
ExceptionHandler getExceptionHandler();
236238
}

src/com/rabbitmq/client/ConnectionFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ public class ConnectionFactory implements Cloneable {
9191
private SaslConfig saslConfig = DefaultSaslConfig.PLAIN;
9292
private ExecutorService sharedExecutor;
9393
private SocketConfigurator socketConf = new DefaultSocketConfigurator();
94-
private ExceptionHandler exceptionHandler = new DefaultExceptionHandler();
94+
private ExceptionHandler exceptionHandler = null;
9595

9696
private boolean automaticRecovery = false;
9797
private boolean topologyRecovery = true;

src/com/rabbitmq/client/impl/ConnectionParams.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,8 @@ public ConnectionParams(String username, String password, ExecutorService execut
5151
this.saslConfig = saslConfig;
5252
this.networkRecoveryInterval = networkRecoveryInterval;
5353
this.topologyRecovery = topologyRecovery;
54-
5554

56-
this.exceptionHandler = new DefaultExceptionHandler();
55+
this.exceptionHandler = exceptionHandler;
5756
}
5857

5958
public String getUsername() {
@@ -101,7 +100,7 @@ public void setExceptionHandler(ExceptionHandler exceptionHandler) {
101100
}
102101

103102
public ConnectionParams exceptionHandler(ExceptionHandler exceptionHandler) {
104-
this.exceptionHandler = exceptionHandler;
103+
setExceptionHandler(exceptionHandler);
105104
return this;
106105
}
107106

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package com.rabbitmq.client.test.functional;
2+
3+
import com.rabbitmq.client.AMQP;
4+
import com.rabbitmq.client.Channel;
5+
import com.rabbitmq.client.Connection;
6+
import com.rabbitmq.client.ConnectionFactory;
7+
import com.rabbitmq.client.Consumer;
8+
import com.rabbitmq.client.DefaultConsumer;
9+
import com.rabbitmq.client.Envelope;
10+
import com.rabbitmq.client.ExceptionHandler;
11+
import com.rabbitmq.client.impl.DefaultExceptionHandler;
12+
import junit.framework.TestCase;
13+
14+
import java.io.IOException;
15+
import java.util.concurrent.CountDownLatch;
16+
import java.util.concurrent.TimeUnit;
17+
18+
public class ExceptionHandling extends TestCase {
19+
private ConnectionFactory newConnectionFactory(ExceptionHandler eh) {
20+
ConnectionFactory cf = new ConnectionFactory();
21+
cf.setExceptionHandler(eh);
22+
return cf;
23+
}
24+
25+
public void testHandleConsumerException() throws IOException, InterruptedException {
26+
final CountDownLatch latch = new CountDownLatch(1);
27+
final DefaultExceptionHandler eh = new DefaultExceptionHandler() {
28+
@Override
29+
public void handleConsumerException(Channel channel, Throwable exception, Consumer consumer, String consumerTag, String methodName) {
30+
latch.countDown();
31+
}
32+
};
33+
ConnectionFactory cf = newConnectionFactory(eh);
34+
assertEquals(cf.getExceptionHandler(), eh);
35+
Connection conn = cf.newConnection();
36+
assertEquals(conn.getExceptionHandler(), eh);
37+
Channel ch = conn.createChannel();
38+
String q = ch.queueDeclare().getQueue();
39+
ch.basicConsume(q, new DefaultConsumer(ch) {
40+
@Override
41+
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
42+
throw new RuntimeException("oops");
43+
}
44+
});
45+
ch.basicPublish("", q, null, "".getBytes());
46+
wait(latch);
47+
}
48+
49+
private void wait(CountDownLatch latch) throws InterruptedException {
50+
latch.await(30, TimeUnit.MINUTES);
51+
}
52+
}

0 commit comments

Comments
 (0)