Skip to content

Commit b1782f0

Browse files
committed
binding failure retry should recover all queue bindings
1 parent 87661ee commit b1782f0

File tree

3 files changed

+175
-3
lines changed

3 files changed

+175
-3
lines changed

src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1091,6 +1091,10 @@ public Map<String, RecordedExchange> getRecordedExchanges() {
10911091
public List<RecordedBinding> getRecordedBindings() {
10921092
return recordedBindings;
10931093
}
1094+
1095+
public Map<String, RecordedConsumer> getRecordedConsumers() {
1096+
return consumers;
1097+
}
10941098

10951099
@Override
10961100
public String toString() {

src/main/java/com/rabbitmq/client/impl/recovery/TopologyRecoveryRetryLogic.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,28 @@ public abstract class TopologyRecoveryRetryLogic {
7777
* Recover a binding.
7878
*/
7979
public static final DefaultRetryHandler.RetryOperation<Void> RECOVER_BINDING = context -> {
80-
context.binding().recover();
80+
if (context.entity() instanceof RecordedQueueBinding) {
81+
// recover all bindings for the queue.
82+
// need to do this incase some bindings have already been recovered successfully before this binding failed
83+
String queue = context.binding().getDestination();
84+
for (RecordedBinding recordedBinding : Utility.copy(context.connection().getRecordedBindings())) {
85+
if (recordedBinding instanceof RecordedQueueBinding && queue.equals(recordedBinding.getDestination())) {
86+
recordedBinding.recover();
87+
}
88+
}
89+
} else if (context.entity() instanceof RecordedExchangeBinding) {
90+
// recover all bindings for the exchange
91+
// need to do this incase some bindings have already been recovered successfully before this binding failed
92+
String exchange = context.binding().getDestination();
93+
for (RecordedBinding recordedBinding : Utility.copy(context.connection().getRecordedBindings())) {
94+
if (recordedBinding instanceof RecordedExchangeBinding && exchange.equals(recordedBinding.getDestination())) {
95+
recordedBinding.recover();
96+
}
97+
}
98+
} else {
99+
// should't be possible to get here, but just in case recover just this binding
100+
context.binding().recover();
101+
}
81102
return null;
82103
};
83104

src/test/java/com/rabbitmq/client/test/functional/TopologyRecoveryRetry.java

Lines changed: 149 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,24 @@
1717

1818
import com.rabbitmq.client.ConnectionFactory;
1919
import com.rabbitmq.client.DefaultConsumer;
20+
import com.rabbitmq.client.Envelope;
21+
import com.rabbitmq.client.Recoverable;
22+
import com.rabbitmq.client.RecoveryListener;
23+
import com.rabbitmq.client.AMQP.BasicProperties;
24+
import com.rabbitmq.client.impl.recovery.AutorecoveringConnection;
25+
import com.rabbitmq.client.impl.recovery.RecordedBinding;
26+
import com.rabbitmq.client.impl.recovery.RecordedConsumer;
2027
import com.rabbitmq.client.test.BrokerTestCase;
2128
import com.rabbitmq.client.test.TestUtils;
29+
import com.rabbitmq.tools.Host;
30+
import org.junit.After;
2231
import org.junit.Test;
23-
32+
import java.io.IOException;
2433
import java.util.HashMap;
34+
import java.util.UUID;
35+
import java.util.concurrent.CountDownLatch;
36+
import java.util.concurrent.TimeUnit;
37+
import java.util.function.Consumer;
2538

2639
import static com.rabbitmq.client.impl.recovery.TopologyRecoveryRetryLogic.RETRY_ON_QUEUE_NOT_FOUND_RETRY_HANDLER;
2740
import static com.rabbitmq.client.test.TestUtils.closeAllConnectionsAndWaitForRecovery;
@@ -32,6 +45,13 @@
3245
*/
3346
public class TopologyRecoveryRetry extends BrokerTestCase {
3447

48+
private Consumer<Integer> backoffConsumer;
49+
50+
@After
51+
public void cleanup() {
52+
backoffConsumer = null;
53+
}
54+
3555
@Test
3656
public void topologyRecoveryRetry() throws Exception {
3757
int nbQueues = 200;
@@ -40,18 +60,145 @@ public void topologyRecoveryRetry() throws Exception {
4060
String queue = prefix + i;
4161
channel.queueDeclare(queue, false, false, true, new HashMap<>());
4262
channel.queueBind(queue, "amq.direct", queue);
63+
channel.queueBind(queue, "amq.direct", queue + "2");
4364
channel.basicConsume(queue, true, new DefaultConsumer(channel));
4465
}
4566

4667
closeAllConnectionsAndWaitForRecovery(this.connection);
4768

4869
assertTrue(channel.isOpen());
4970
}
71+
72+
@Test
73+
public void topologyRecoveryBindingFailure() throws Exception {
74+
final String queue = "topology-recovery-retry-binding-failure" + System.currentTimeMillis();
75+
channel.queueDeclare(queue, false, false, true, new HashMap<>());
76+
channel.queueBind(queue, "amq.topic", "topic1");
77+
channel.queueBind(queue, "amq.topic", "topic2");
78+
final CountDownLatch messagesReceivedLatch = new CountDownLatch(2);
79+
channel.basicConsume(queue, true, new DefaultConsumer(channel) {
80+
@Override
81+
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
82+
System.out.println("Got message=" + new String(body));
83+
messagesReceivedLatch.countDown();
84+
}
85+
});
86+
final CountDownLatch recoveryLatch = new CountDownLatch(1);
87+
((AutorecoveringConnection)connection).addRecoveryListener(new RecoveryListener() {
88+
@Override
89+
public void handleRecoveryStarted(Recoverable recoverable) {
90+
// no-op
91+
}
92+
@Override
93+
public void handleRecovery(Recoverable recoverable) {
94+
recoveryLatch.countDown();
95+
}
96+
});
97+
98+
// we want recovery to fail when recovering the 2nd binding
99+
// give the 2nd recorded binding a bad queue name so it fails
100+
final RecordedBinding binding2 = ((AutorecoveringConnection)connection).getRecordedBindings().get(1);
101+
binding2.destination(UUID.randomUUID().toString());
102+
103+
// use the backoffConsumer to know that it has failed
104+
// then delete the real queue & fix the recorded binding
105+
// it should fail once more because queue is gone, and then succeed
106+
final CountDownLatch backoffLatch = new CountDownLatch(1);
107+
backoffConsumer = attempt -> {
108+
binding2.destination(queue);
109+
try {
110+
Host.rabbitmqctl("delete_queue " + queue);
111+
Thread.sleep(2000);
112+
} catch (Exception e) {
113+
e.printStackTrace();
114+
}
115+
backoffLatch.countDown();
116+
};
117+
118+
// close connection
119+
Host.closeAllConnections();
120+
121+
// assert backoff was called
122+
assertTrue(backoffLatch.await(90, TimeUnit.SECONDS));
123+
// wait for full recovery
124+
assertTrue(recoveryLatch.await(90, TimeUnit.SECONDS));
125+
126+
// publish messages to verify both bindings were recovered
127+
basicPublishVolatile("test1".getBytes(), "amq.topic", "topic1");
128+
basicPublishVolatile("test2".getBytes(), "amq.topic", "topic2");
129+
130+
assertTrue(messagesReceivedLatch.await(10, TimeUnit.SECONDS));
131+
}
132+
133+
@Test
134+
public void topologyRecoveryConsumerFailure() throws Exception {
135+
final String queue = "topology-recovery-retry-consumer-failure" + System.currentTimeMillis();
136+
channel.queueDeclare(queue, false, false, true, new HashMap<>());
137+
channel.queueBind(queue, "amq.topic", "topic1");
138+
channel.queueBind(queue, "amq.topic", "topic2");
139+
final CountDownLatch messagesReceivedLatch = new CountDownLatch(2);
140+
channel.basicConsume(queue, true, new DefaultConsumer(channel) {
141+
@Override
142+
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
143+
System.out.println("Got message=" + new String(body));
144+
messagesReceivedLatch.countDown();
145+
}
146+
});
147+
final CountDownLatch recoveryLatch = new CountDownLatch(1);
148+
((AutorecoveringConnection)connection).addRecoveryListener(new RecoveryListener() {
149+
@Override
150+
public void handleRecoveryStarted(Recoverable recoverable) {
151+
// no-op
152+
}
153+
@Override
154+
public void handleRecovery(Recoverable recoverable) {
155+
recoveryLatch.countDown();
156+
}
157+
});
158+
159+
// we want recovery to fail when recovering the consumer
160+
// give the recorded consumer a bad queue name so it fails
161+
final RecordedConsumer consumer = ((AutorecoveringConnection)connection).getRecordedConsumers().values().iterator().next();
162+
consumer.setQueue(UUID.randomUUID().toString());
163+
164+
// use the backoffConsumer to know that it has failed
165+
// then delete the real queue & fix the recorded consumer
166+
// it should fail once more because queue is gone, and then succeed
167+
final CountDownLatch backoffLatch = new CountDownLatch(1);
168+
backoffConsumer = attempt -> {
169+
consumer.setQueue(queue);
170+
try {
171+
Host.rabbitmqctl("delete_queue " + queue);
172+
Thread.sleep(2000);
173+
} catch (Exception e) {
174+
e.printStackTrace();
175+
}
176+
backoffLatch.countDown();
177+
};
178+
179+
// close connection
180+
Host.closeAllConnections();
181+
182+
// assert backoff was called
183+
assertTrue(backoffLatch.await(90, TimeUnit.SECONDS));
184+
// wait for full recovery
185+
assertTrue(recoveryLatch.await(90, TimeUnit.SECONDS));
186+
187+
// publish messages to verify both bindings & consumer were recovered
188+
basicPublishVolatile("test1".getBytes(), "amq.topic", "topic1");
189+
basicPublishVolatile("test2".getBytes(), "amq.topic", "topic2");
190+
191+
assertTrue(messagesReceivedLatch.await(10, TimeUnit.SECONDS));
192+
}
50193

51194
@Override
52195
protected ConnectionFactory newConnectionFactory() {
53196
ConnectionFactory connectionFactory = TestUtils.connectionFactory();
54-
connectionFactory.setTopologyRecoveryRetryHandler(RETRY_ON_QUEUE_NOT_FOUND_RETRY_HANDLER.build());
197+
connectionFactory.setTopologyRecoveryRetryHandler(RETRY_ON_QUEUE_NOT_FOUND_RETRY_HANDLER.backoffPolicy(attempt -> {
198+
if (backoffConsumer != null) {
199+
backoffConsumer.accept(attempt);
200+
}
201+
}).build());
55202
connectionFactory.setNetworkRecoveryInterval(1000);
56203
return connectionFactory;
57204
}

0 commit comments

Comments
 (0)