Skip to content

Commit 1abb5c2

Browse files
committed
Add test case for broken RpcClient on broker restart
The RpcClient registers a reply consumer on the direct-reply pseudo-queue and doesn't cancel it when the connection fails. The RpcClient is then unusable by design (it nullifies the consumer on connection failure). Nevertheless, the consumer must be cancelled after recovery to be able to create a new RpcClient on the same channel. Disabling topology recovery makes this work: the consumer isn't re-registered during topology recovery and a new RpcClient can be created on the same (recovered) channel. References #382
1 parent 7a6a513 commit 1abb5c2

File tree

1 file changed

+54
-2
lines changed

1 file changed

+54
-2
lines changed

src/test/java/com/rabbitmq/client/test/RpcTest.java

Lines changed: 54 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,21 @@
1717
package com.rabbitmq.client.test;
1818

1919
import com.rabbitmq.client.*;
20+
import com.rabbitmq.client.impl.NetworkConnection;
21+
import com.rabbitmq.client.impl.recovery.AutorecoveringConnection;
22+
import com.rabbitmq.tools.Host;
2023
import org.junit.After;
2124
import org.junit.Before;
2225
import org.junit.Test;
2326

2427
import java.io.IOException;
25-
import java.util.Collections;
2628
import java.util.HashMap;
2729
import java.util.Map;
30+
import java.util.concurrent.CountDownLatch;
31+
import java.util.concurrent.TimeUnit;
2832

2933
import static org.junit.Assert.assertEquals;
34+
import static org.junit.Assert.assertTrue;
3035

3136
public class RpcTest {
3237

@@ -35,7 +40,6 @@ public class RpcTest {
3540
String queue = "rpc.queue";
3641
RpcServer rpcServer;
3742

38-
3943
@Before public void init() throws Exception {
4044
clientConnection = TestUtils.connectionFactory().newConnection();
4145
clientChannel = clientConnection.createChannel();
@@ -77,6 +81,54 @@ public void run() {
7781
client.close();
7882
}
7983

84+
@Test public void brokenAfterBrokerRestart() throws Exception {
85+
// see https://github.com/rabbitmq/rabbitmq-java-client/issues/382
86+
rpcServer = new TestRpcServer(serverChannel, queue);
87+
new Thread(new Runnable() {
88+
@Override
89+
public void run() {
90+
try {
91+
rpcServer.mainloop();
92+
} catch (Exception e) {
93+
// safe to ignore when loops ends/server is canceled
94+
}
95+
}
96+
}).start();
97+
98+
ConnectionFactory cf = TestUtils.connectionFactory();
99+
cf.setTopologyRecoveryEnabled(false);
100+
cf.setNetworkRecoveryInterval(2000);
101+
Connection connection = null;
102+
try {
103+
connection = cf.newConnection();
104+
Channel channel = connection.createChannel();
105+
RpcClient client = new RpcClient(channel, "", queue, 1000);
106+
RpcClient.Response response = client.doCall(null, "hello".getBytes());
107+
assertEquals("*** hello ***", new String(response.getBody()));
108+
final CountDownLatch recoveryLatch = new CountDownLatch(1);
109+
((AutorecoveringConnection) connection).addRecoveryListener(new RecoveryListener() {
110+
@Override
111+
public void handleRecovery(Recoverable recoverable) {
112+
recoveryLatch.countDown();
113+
}
114+
@Override
115+
public void handleRecoveryStarted(Recoverable recoverable) {
116+
117+
}
118+
});
119+
Host.closeConnection((NetworkConnection) connection);
120+
assertTrue("Connection should have recovered by now", recoveryLatch.await(10, TimeUnit.SECONDS));
121+
client = new RpcClient(channel, "", queue, 1000);
122+
response = client.doCall(null, "hello".getBytes());
123+
assertEquals("*** hello ***", new String(response.getBody()));
124+
} finally {
125+
if (connection != null) {
126+
connection.close();
127+
}
128+
}
129+
130+
}
131+
80132
private static class TestRpcServer extends RpcServer {
81133

82134
public TestRpcServer(Channel channel, String queueName) throws IOException {

0 commit comments

Comments
 (0)