1313// If you have any questions regarding licensing, please contact us at
1414// info@rabbitmq.com.
1515
16-
1716package com .rabbitmq .client .test ;
1817
19- import com .rabbitmq .client .*;
18+ import com .rabbitmq .client .AMQP ;
19+ import com .rabbitmq .client .Channel ;
20+ import com .rabbitmq .client .Connection ;
21+ import com .rabbitmq .client .ConnectionFactory ;
22+ import com .rabbitmq .client .QueueingConsumer ;
23+ import com .rabbitmq .client .Recoverable ;
24+ import com .rabbitmq .client .RecoveryListener ;
25+ import com .rabbitmq .client .RpcClient ;
26+ import com .rabbitmq .client .RpcServer ;
27+ import com .rabbitmq .client .ShutdownSignalException ;
2028import com .rabbitmq .client .impl .NetworkConnection ;
2129import com .rabbitmq .client .impl .recovery .AutorecoveringConnection ;
30+ import com .rabbitmq .client .impl .recovery .RecordedBinding ;
31+ import com .rabbitmq .client .impl .recovery .RecordedConsumer ;
32+ import com .rabbitmq .client .impl .recovery .RecordedExchange ;
33+ import com .rabbitmq .client .impl .recovery .RecordedQueue ;
34+ import com .rabbitmq .client .impl .recovery .TopologyRecoveryFilter ;
2235import com .rabbitmq .tools .Host ;
2336import org .junit .After ;
2437import org .junit .Before ;
3245
3346import static org .junit .Assert .assertEquals ;
3447import static org .junit .Assert .assertTrue ;
48+ import static org .junit .Assert .fail ;
3549
3650public class RpcTest {
3751
@@ -40,19 +54,21 @@ public class RpcTest {
4054 String queue = "rpc.queue" ;
4155 RpcServer rpcServer ;
4256
43- @ Before public void init () throws Exception {
57+ @ Before
58+ public void init () throws Exception {
4459 clientConnection = TestUtils .connectionFactory ().newConnection ();
4560 clientChannel = clientConnection .createChannel ();
4661 serverConnection = TestUtils .connectionFactory ().newConnection ();
4762 serverChannel = serverConnection .createChannel ();
4863 serverChannel .queueDeclare (queue , false , false , false , null );
4964 }
5065
51- @ After public void tearDown () throws Exception {
52- if (rpcServer != null ) {
66+ @ After
67+ public void tearDown () throws Exception {
68+ if (rpcServer != null ) {
5369 rpcServer .terminateMainloop ();
5470 }
55- if (serverChannel != null ) {
71+ if (serverChannel != null ) {
5672 serverChannel .queueDelete (queue );
5773 }
5874 clientConnection .close ();
@@ -63,6 +79,7 @@ public class RpcTest {
6379 public void rpc () throws Exception {
6480 rpcServer = new TestRpcServer (serverChannel , queue );
6581 new Thread (new Runnable () {
82+
6683 @ Override
6784 public void run () {
6885 try {
@@ -81,10 +98,12 @@ public void run() {
8198 client .close ();
8299 }
83100
84- @ Test public void brokenAfterBrokerRestart () throws Exception {
101+ @ Test
102+ public void givenConsumerNotRecoveredCanCreateNewClientOnSameChannelAfterConnectionFailure () throws Exception {
85103 // see https://github.com/rabbitmq/rabbitmq-java-client/issues/382
86104 rpcServer = new TestRpcServer (serverChannel , queue );
87105 new Thread (new Runnable () {
106+
88107 @ Override
89108 public void run () {
90109 try {
@@ -96,8 +115,8 @@ public void run() {
96115 }).start ();
97116
98117 ConnectionFactory cf = TestUtils .connectionFactory ();
99- cf .setTopologyRecoveryEnabled ( false );
100- cf .setNetworkRecoveryInterval (2000 );
118+ cf .setTopologyRecoveryFilter ( new NoDirectReplyToConsumerTopologyRecoveryFilter () );
119+ cf .setNetworkRecoveryInterval (1000 );
101120 Connection connection = null ;
102121 try {
103122 connection = cf .newConnection ();
@@ -107,10 +126,12 @@ public void run() {
107126 assertEquals ("*** hello ***" , new String (response .getBody ()));
108127 final CountDownLatch recoveryLatch = new CountDownLatch (1 );
109128 ((AutorecoveringConnection ) connection ).addRecoveryListener (new RecoveryListener () {
129+
110130 @ Override
111131 public void handleRecovery (Recoverable recoverable ) {
112132 recoveryLatch .countDown ();
113133 }
134+
114135 @ Override
115136 public void handleRecoveryStarted (Recoverable recoverable ) {
116137
@@ -126,7 +147,62 @@ public void handleRecoveryStarted(Recoverable recoverable) {
126147 connection .close ();
127148 }
128149 }
150+ }
151+
152+ @ Test
153+ public void givenConsumerIsRecoveredCanNotCreateNewClientOnSameChannelAfterConnectionFailure () throws Exception {
154+ // see https://github.com/rabbitmq/rabbitmq-java-client/issues/382
155+ rpcServer = new TestRpcServer (serverChannel , queue );
156+ new Thread (new Runnable () {
157+
158+ @ Override
159+ public void run () {
160+ try {
161+ rpcServer .mainloop ();
162+ } catch (Exception e ) {
163+ // safe to ignore when loops ends/server is canceled
164+ }
165+ }
166+ }).start ();
167+
168+ ConnectionFactory cf = TestUtils .connectionFactory ();
169+ cf .setNetworkRecoveryInterval (1000 );
170+ Connection connection = null ;
171+ try {
172+ connection = cf .newConnection ();
173+ Channel channel = connection .createChannel ();
174+ RpcClient client = new RpcClient (channel , "" , queue , 1000 );
175+ RpcClient .Response response = client .doCall (null , "hello" .getBytes ());
176+ assertEquals ("*** hello ***" , new String (response .getBody ()));
177+ final CountDownLatch recoveryLatch = new CountDownLatch (1 );
178+ ((AutorecoveringConnection ) connection ).addRecoveryListener (new RecoveryListener () {
129179
180+ @ Override
181+ public void handleRecovery (Recoverable recoverable ) {
182+ recoveryLatch .countDown ();
183+ }
184+
185+ @ Override
186+ public void handleRecoveryStarted (Recoverable recoverable ) {
187+
188+ }
189+ });
190+ Host .closeConnection ((NetworkConnection ) connection );
191+ assertTrue ("Connection should have recovered by now" , recoveryLatch .await (10 , TimeUnit .SECONDS ));
192+ try {
193+ new RpcClient (channel , "" , queue , 1000 );
194+ fail ("Cannot create RPC client on same channel, an exception should have been thrown" );
195+ } catch (IOException e ) {
196+ assertTrue (e .getCause () instanceof ShutdownSignalException );
197+ ShutdownSignalException cause = (ShutdownSignalException ) e .getCause ();
198+ assertTrue (cause .getReason () instanceof AMQP .Channel .Close );
199+ assertEquals (406 , ((AMQP .Channel .Close ) cause .getReason ()).getReplyCode ());
200+ }
201+ } finally {
202+ if (connection != null ) {
203+ connection .close ();
204+ }
205+ }
130206 }
131207
132208 private static class TestRpcServer extends RpcServer {
@@ -157,4 +233,27 @@ protected AMQP.BasicProperties postprocessReplyProperties(QueueingConsumer.Deliv
157233 return builder .build ();
158234 }
159235 }
236+
237+ private static class NoDirectReplyToConsumerTopologyRecoveryFilter implements TopologyRecoveryFilter {
238+
239+ @ Override
240+ public boolean filterExchange (RecordedExchange recordedExchange ) {
241+ return true ;
242+ }
243+
244+ @ Override
245+ public boolean filterQueue (RecordedQueue recordedQueue ) {
246+ return true ;
247+ }
248+
249+ @ Override
250+ public boolean filterBinding (RecordedBinding recordedBinding ) {
251+ return true ;
252+ }
253+
254+ @ Override
255+ public boolean filterConsumer (RecordedConsumer recordedConsumer ) {
256+ return !"amq.rabbitmq.reply-to" .equals (recordedConsumer .getQueue ());
257+ }
258+ }
160259}
0 commit comments