Skip to content

Commit 4fc376c

Browse files
author
Simon MacMullen
committed
Don't make ConnectionParams mutable WRT exception handler, it's not mutable for anything else. That means we can skip the final safety check in the AMQConnection ctor.
1 parent 4da2185 commit 4fc376c

File tree

4 files changed

+40
-61
lines changed

4 files changed

+40
-61
lines changed

src/com/rabbitmq/client/impl/AMQConnection.java

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -205,14 +205,15 @@ public AMQConnection(ConnectionParams params, FrameHandler frameHandler)
205205
this.password = params.getPassword();
206206
this._frameHandler = frameHandler;
207207
this._virtualHost = params.getVirtualHost();
208+
this._exceptionHandler = params.getExceptionHandler();
209+
208210
this._clientProperties = new HashMap<String, Object>(params.getClientProperties());
209211
this.requestedFrameMax = params.getRequestedFrameMax();
210212
this.requestedChannelMax = params.getRequestedChannelMax();
211213
this.requestedHeartbeat = params.getRequestedHeartbeat();
212214
this.saslConfig = params.getSaslConfig();
213215

214216
this._workService = new ConsumerWorkService(params.getExecutor());
215-
this._exceptionHandler = initExceptionHandler(params);
216217
this._channelManager = null;
217218

218219
this._heartbeatSender = new HeartbeatSender(frameHandler);
@@ -221,17 +222,6 @@ public AMQConnection(ConnectionParams params, FrameHandler frameHandler)
221222
this._inConnectionNegotiation = true; // we start out waiting for the first protocol response
222223
}
223224

224-
private ExceptionHandler initExceptionHandler(ConnectionParams params) {
225-
ExceptionHandler result = params.getExceptionHandler();
226-
// null exception handler causes I/O thread to fail,
227-
// so be extra defensive here
228-
if(result == null) {
229-
return new DefaultExceptionHandler();
230-
} else {
231-
return result;
232-
}
233-
}
234-
235225
/**
236226
* Start up the connection, including the MainLoop thread.
237227
* Sends the protocol

src/com/rabbitmq/client/impl/ConnectionParams.java

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@ public ConnectionParams(String username, String password, ExecutorService execut
5151
this.saslConfig = saslConfig;
5252
this.networkRecoveryInterval = networkRecoveryInterval;
5353
this.topologyRecovery = topologyRecovery;
54-
5554
this.exceptionHandler = exceptionHandler;
5655
}
5756

@@ -95,15 +94,6 @@ public ExceptionHandler getExceptionHandler() {
9594
return exceptionHandler;
9695
}
9796

98-
public void setExceptionHandler(ExceptionHandler exceptionHandler) {
99-
this.exceptionHandler = exceptionHandler;
100-
}
101-
102-
public ConnectionParams exceptionHandler(ExceptionHandler exceptionHandler) {
103-
setExceptionHandler(exceptionHandler);
104-
return this;
105-
}
106-
10797
public int getNetworkRecoveryInterval() {
10898
return networkRecoveryInterval;
10999
}

test/src/com/rabbitmq/client/test/AMQConnectionTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ public static TestSuite suite() {
5959
/** The mock frame handler used to test connection behaviour. */
6060
private MockFrameHandler _mockFrameHandler;
6161
private ConnectionFactory factory;
62+
private MyExceptionHandler exceptionHandler;
6263

6364
/** Setup the environment for this test
6465
* @see junit.framework.TestCase#setUp()
@@ -68,6 +69,8 @@ public static TestSuite suite() {
6869
super.setUp();
6970
_mockFrameHandler = new MockFrameHandler();
7071
factory = new ConnectionFactory();
72+
exceptionHandler = new MyExceptionHandler();
73+
factory.setExceptionHandler(exceptionHandler);
7174
}
7275

7376
/** Tear down the environment for this test
@@ -86,19 +89,17 @@ public static TestSuite suite() {
8689
public void testConnectionSendsSingleHeaderAndTimesOut() {
8790
IOException exception = new SocketTimeoutException();
8891
_mockFrameHandler.setExceptionOnReadingFrames(exception);
89-
MyExceptionHandler handler = new MyExceptionHandler();
9092
assertEquals(0, _mockFrameHandler.countHeadersSent());
9193
try {
9294
ConnectionParams params = factory.params(Executors.newFixedThreadPool(1));
93-
params.setExceptionHandler(handler);
9495
new AMQConnection(params, _mockFrameHandler).start();
9596
fail("Connection should have thrown exception");
9697
} catch(IOException signal) {
9798
// As expected
9899
}
99100
assertEquals(1, _mockFrameHandler.countHeadersSent());
100101
// _connection.close(0, CLOSE_MESSAGE);
101-
List<Throwable> exceptionList = handler.getHandledExceptions();
102+
List<Throwable> exceptionList = exceptionHandler.getHandledExceptions();
102103
assertEquals(Collections.<Throwable>singletonList(exception), exceptionList);
103104
}
104105

@@ -124,7 +125,6 @@ public void testConnectionHangInNegotiation() {
124125
assertEquals(0, this._mockFrameHandler.countHeadersSent());
125126
try {
126127
ConnectionParams params = factory.params(Executors.newFixedThreadPool(1));
127-
params.setExceptionHandler(handler);
128128
new AMQConnection(params, this._mockFrameHandler).start();
129129
fail("Connection should have thrown exception");
130130
} catch(IOException signal) {

test/src/com/rabbitmq/client/test/CloseInMainLoop.java

Lines changed: 34 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,38 @@
3636
import com.rabbitmq.client.impl.SocketFrameHandler;
3737

3838
public class CloseInMainLoop extends BrokerTestCase{
39+
private final CountDownLatch closeLatch = new CountDownLatch(1);
40+
41+
private ConnectionFactory specialConnectionFactory() {
42+
ConnectionFactory f = new ConnectionFactory();
43+
f.setExceptionHandler(new DefaultExceptionHandler(){
44+
@Override
45+
public void handleConsumerException(Channel channel,
46+
Throwable exception,
47+
Consumer consumer,
48+
String consumerTag,
49+
String methodName) {
50+
try {
51+
// TODO: change this to call 4-parameter close and make 6-parm one private
52+
((AMQConnection) channel.getConnection())
53+
.close(AMQP.INTERNAL_ERROR,
54+
"Internal error in Consumer " + consumerTag,
55+
false,
56+
exception,
57+
-1,
58+
false);
59+
} catch (Throwable e) {
60+
// Man, this clearly isn't our day.
61+
// TODO: Log the nested failure
62+
} finally {
63+
closeLatch.countDown();
64+
}
65+
}
66+
});
67+
return f;
68+
}
3969

40-
private final CountDownLatch closeLatch = new CountDownLatch(1);
41-
42-
class SpecialConnection extends AMQConnection{
70+
class SpecialConnection extends AMQConnection{
4371
private AtomicBoolean validShutdown = new AtomicBoolean(false);
4472

4573
public boolean hadValidShutdown(){
@@ -48,45 +76,16 @@ public boolean hadValidShutdown(){
4876
}
4977

5078
public SpecialConnection() throws Exception {
51-
this(new ConnectionFactory());
52-
}
53-
54-
private SpecialConnection(ConnectionFactory factory) throws Exception{
55-
super(factory.params(Executors.newFixedThreadPool(1)).exceptionHandler(
56-
new DefaultExceptionHandler(){
57-
@Override
58-
public void handleConsumerException(Channel channel,
59-
Throwable exception,
60-
Consumer consumer,
61-
String consumerTag,
62-
String methodName) {
63-
try {
64-
// TODO: change this to call 4-parameter close and make 6-parm one private
65-
((AMQConnection) channel.getConnection())
66-
.close(AMQP.INTERNAL_ERROR,
67-
"Internal error in Consumer " + consumerTag,
68-
false,
69-
exception,
70-
-1,
71-
false);
72-
} catch (Throwable e) {
73-
// Man, this clearly isn't our day.
74-
// TODO: Log the nested failure
75-
} finally {
76-
closeLatch.countDown();
77-
}
78-
}
79-
}
80-
), new SocketFrameHandler(SocketFactory.getDefault().createSocket("localhost", AMQP.PROTOCOL.PORT)));
79+
super(specialConnectionFactory().params(Executors.newFixedThreadPool(1)),
80+
new SocketFrameHandler(SocketFactory.getDefault().createSocket("localhost", AMQP.PROTOCOL.PORT)));
8181
this.start();
82-
}
82+
}
8383

8484
@Override
8585
public boolean processControlCommand(Command c) throws IOException{
8686
if(c.getMethod() instanceof AMQP.Connection.CloseOk) validShutdown.set(true);
8787
return super.processControlCommand(c);
8888
}
89-
9089
}
9190

9291
public void testCloseOKNormallyReceived() throws Exception{

0 commit comments

Comments
 (0)