Skip to content

Commit 96cf2fd

Browse files
committed
Polish recovery retry
Add log in default retry handler, add operation to recover all the bindings of a queue (useful when the recovery of a consumer fails because isn't found), make AutorecoveringConnection#recoverConsumer and AutorecoveringConnection#recoverQueue public as they contain useful logic that some client code should be able to use, and declared a pre-configured retry handler for the deleted queue case. References #387 (cherry picked from commit 2b8d257) Conflicts: src/main/java/com/rabbitmq/client/impl/recovery/DefaultRetryHandler.java src/main/java/com/rabbitmq/client/impl/recovery/TopologyRecoveryRetryLogic.java
1 parent 5eef5cc commit 96cf2fd

File tree

4 files changed

+71
-29
lines changed

4 files changed

+71
-29
lines changed

src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -718,7 +718,7 @@ public Void call() throws Exception {
718718
}
719719

720720

721-
void recoverQueue(final String oldName, RecordedQueue q, boolean retry) {
721+
public void recoverQueue(final String oldName, RecordedQueue q, boolean retry) {
722722
try {
723723
if (topologyRecoveryFilter.filterQueue(q)) {
724724
LOGGER.debug("Recovering {}", q);
@@ -789,7 +789,7 @@ public Void call() throws Exception {
789789
}
790790
}
791791

792-
private void recoverConsumer(final String tag, RecordedConsumer consumer, boolean retry) {
792+
public void recoverConsumer(final String tag, RecordedConsumer consumer, boolean retry) {
793793
try {
794794
if (this.topologyRecoveryFilter.filterConsumer(consumer)) {
795795
LOGGER.debug("Recovering {}", consumer);
@@ -1110,6 +1110,10 @@ public Map<String, RecordedExchange> getRecordedExchanges() {
11101110
return recordedExchanges;
11111111
}
11121112

1113+
public List<RecordedBinding> getRecordedBindings() {
1114+
return recordedBindings;
1115+
}
1116+
11131117
@Override
11141118
public String toString() {
11151119
return this.delegate.toString();

src/main/java/com/rabbitmq/client/impl/recovery/DefaultRetryHandler.java

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@
1515

1616
package com.rabbitmq.client.impl.recovery;
1717

18+
import org.slf4j.Logger;
19+
import org.slf4j.LoggerFactory;
20+
1821
/**
1922
* Composable topology recovery retry handler.
2023
* This retry handler implementations let the user choose the condition
@@ -32,6 +35,8 @@
3235
*/
3336
public class DefaultRetryHandler implements RetryHandler {
3437

38+
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultRetryHandler.class);
39+
3540
private final RetryCondition<? super RecordedQueue> queueRecoveryRetryCondition;
3641
private final RetryCondition<? super RecordedExchange> exchangeRecoveryRetryCondition;
3742
private final RetryCondition<? super RecordedBinding> bindingRecoveryRetryCondition;
@@ -91,6 +96,7 @@ public RetryResult retryConsumerRecovery(RetryContext context) throws Exception
9196

9297
protected <T extends RecordedEntity> RetryResult doRetry(RetryCondition<T> condition, RetryOperation<?> operation, T entity, RetryContext context)
9398
throws Exception {
99+
log(entity, context.exception());
94100
int attempts = 0;
95101
Exception exception = context.exception();
96102
while (attempts < retryAttempts) {
@@ -112,6 +118,10 @@ protected <T extends RecordedEntity> RetryResult doRetry(RetryCondition<T> condi
112118
throw context.exception();
113119
}
114120

121+
protected void log(RecordedEntity entity, Exception exception) {
122+
LOGGER.info("Error while recovering {}, retrying with {} attempt(s).", entity, retryAttempts, exception);
123+
}
124+
115125
public static abstract class RetryOperation<T> {
116126

117127
public abstract T call(RetryContext context) throws Exception;
@@ -157,18 +167,5 @@ public boolean test(E entity, Exception ex) {
157167
}
158168
};
159169
}
160-
161-
public RetryCondition<E> or(final RetryCondition<? super E> other) {
162-
if (other == null) {
163-
throw new IllegalArgumentException("Condition cannot be null");
164-
}
165-
return new RetryCondition<E>() {
166-
167-
@Override
168-
public boolean test(E entity, Exception ex) {
169-
return RetryCondition.this.test(entity, ex) || other.test(entity, ex);
170-
}
171-
};
172-
}
173170
}
174171
}

src/main/java/com/rabbitmq/client/impl/recovery/TopologyRecoveryRetryLogic.java

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
import com.rabbitmq.client.AMQP;
1919
import com.rabbitmq.client.ShutdownSignalException;
2020

21+
import static com.rabbitmq.client.impl.recovery.TopologyRecoveryRetryHandlerBuilder.builder;
22+
2123
/**
2224
* Useful ready-to-use conditions and operations for {@link DefaultRetryHandler}.
2325
* They're composed and used with the {@link TopologyRecoveryRetryHandlerBuilder}.
@@ -29,6 +31,9 @@
2931
*/
3032
public abstract class TopologyRecoveryRetryLogic {
3133

34+
/**
35+
* Channel has been closed because of a resource that doesn't exist.
36+
*/
3237
public static final DefaultRetryHandler.RetryCondition<RecordedEntity> CHANNEL_CLOSED_NOT_FOUND = new DefaultRetryHandler.RetryCondition<RecordedEntity>() {
3338

3439
@Override
@@ -43,6 +48,9 @@ public boolean test(RecordedEntity entity, Exception e) {
4348
}
4449
};
4550

51+
/**
52+
* Recover a channel.
53+
*/
4654
public static final DefaultRetryHandler.RetryOperation<Void> RECOVER_CHANNEL = new DefaultRetryHandler.RetryOperation<Void>() {
4755

4856
@Override
@@ -54,6 +62,9 @@ public Void call(RetryContext context) throws Exception {
5462
}
5563
};
5664

65+
/**
66+
* Recover the destination queue of a binding.
67+
*/
5768
public static final DefaultRetryHandler.RetryOperation<Void> RECOVER_BINDING_QUEUE = new DefaultRetryHandler.RetryOperation<Void>() {
5869

5970
@Override
@@ -72,6 +83,9 @@ public Void call(RetryContext context) {
7283
}
7384
};
7485

86+
/**
87+
* Recover a binding.
88+
*/
7589
public static final DefaultRetryHandler.RetryOperation<Void> RECOVER_BINDING = new DefaultRetryHandler.RetryOperation<Void>() {
7690

7791
@Override
@@ -81,6 +95,9 @@ public Void call(RetryContext context) throws Exception {
8195
}
8296
};
8397

98+
/**
99+
* Recover the queue of a consumer.
100+
*/
84101
public static final DefaultRetryHandler.RetryOperation<Void> RECOVER_CONSUMER_QUEUE = new DefaultRetryHandler.RetryOperation<Void>() {
85102

86103
@Override
@@ -99,11 +116,47 @@ public Void call(RetryContext context) {
99116
}
100117
};
101118

119+
/**
120+
* Recover a consumer.
121+
*/
102122
public static final DefaultRetryHandler.RetryOperation<String> RECOVER_CONSUMER = new DefaultRetryHandler.RetryOperation<String>() {
103123

104124
@Override
105125
public String call(RetryContext context) throws Exception {
106126
return context.consumer().recover();
107127
}
108128
};
129+
130+
/**
131+
* Recover all the bindings of the queue of a consumer.
132+
*/
133+
public static final DefaultRetryHandler.RetryOperation<Void> RECOVER_CONSUMER_QUEUE_BINDINGS = new DefaultRetryHandler.RetryOperation<Void>() {
134+
135+
@Override
136+
public Void call(RetryContext context) throws Exception {
137+
if (context.entity() instanceof RecordedConsumer) {
138+
String queue = context.consumer().getQueue();
139+
for (RecordedBinding recordedBinding : context.connection().getRecordedBindings()) {
140+
if (recordedBinding instanceof RecordedQueueBinding && queue.equals(recordedBinding.getDestination())) {
141+
recordedBinding.recover();
142+
}
143+
}
144+
}
145+
return null;
146+
}
147+
};
148+
149+
/**
150+
* Pre-configured {@link DefaultRetryHandler} that retries recovery of bindings and consumers
151+
* when their respective queue is not found.
152+
* This retry handler can be useful for long recovery processes, whereby auto-delete queues
153+
* can be deleted between queue recovery and binding/consumer recovery.
154+
*/
155+
public static final RetryHandler RETRY_ON_QUEUE_NOT_FOUND_RETRY_HANDLER = builder()
156+
.bindingRecoveryRetryCondition(CHANNEL_CLOSED_NOT_FOUND)
157+
.consumerRecoveryRetryCondition(CHANNEL_CLOSED_NOT_FOUND)
158+
.bindingRecoveryRetryOperation(RECOVER_CHANNEL.andThen(RECOVER_BINDING_QUEUE).andThen(RECOVER_BINDING))
159+
.consumerRecoveryRetryOperation(RECOVER_CHANNEL.andThen(RECOVER_CONSUMER_QUEUE.andThen(RECOVER_CONSUMER)
160+
.andThen(RECOVER_CONSUMER_QUEUE_BINDINGS)))
161+
.build();
109162
}

src/test/java/com/rabbitmq/client/test/functional/TopologyRecoveryRetry.java

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,7 @@
2727

2828
import java.util.HashMap;
2929

30-
import static com.rabbitmq.client.impl.recovery.TopologyRecoveryRetryHandlerBuilder.builder;
31-
import static com.rabbitmq.client.impl.recovery.TopologyRecoveryRetryLogic.CHANNEL_CLOSED_NOT_FOUND;
32-
import static com.rabbitmq.client.impl.recovery.TopologyRecoveryRetryLogic.RECOVER_BINDING;
33-
import static com.rabbitmq.client.impl.recovery.TopologyRecoveryRetryLogic.RECOVER_BINDING_QUEUE;
34-
import static com.rabbitmq.client.impl.recovery.TopologyRecoveryRetryLogic.RECOVER_CHANNEL;
35-
import static com.rabbitmq.client.impl.recovery.TopologyRecoveryRetryLogic.RECOVER_CONSUMER;
36-
import static com.rabbitmq.client.impl.recovery.TopologyRecoveryRetryLogic.RECOVER_CONSUMER_QUEUE;
30+
import static com.rabbitmq.client.impl.recovery.TopologyRecoveryRetryLogic.RETRY_ON_QUEUE_NOT_FOUND_RETRY_HANDLER;
3731
import static com.rabbitmq.client.test.TestUtils.closeAllConnectionsAndWaitForRecovery;
3832
import static org.junit.Assert.assertTrue;
3933

@@ -61,13 +55,7 @@ public void topologyRecoveryRetry() throws Exception {
6155
@Override
6256
protected ConnectionFactory newConnectionFactory() {
6357
ConnectionFactory connectionFactory = TestUtils.connectionFactory();
64-
connectionFactory.setTopologyRecoveryRetryHandler(
65-
builder().bindingRecoveryRetryCondition(CHANNEL_CLOSED_NOT_FOUND)
66-
.consumerRecoveryRetryCondition(CHANNEL_CLOSED_NOT_FOUND)
67-
.bindingRecoveryRetryOperation(RECOVER_CHANNEL.andThen(RECOVER_BINDING_QUEUE).andThen(RECOVER_BINDING))
68-
.consumerRecoveryRetryOperation(RECOVER_CHANNEL.andThen(RECOVER_CONSUMER_QUEUE.andThen(RECOVER_CONSUMER)))
69-
.build()
70-
);
58+
connectionFactory.setTopologyRecoveryRetryHandler(RETRY_ON_QUEUE_NOT_FOUND_RETRY_HANDLER);
7159
connectionFactory.setNetworkRecoveryInterval(1000);
7260
return connectionFactory;
7361
}

0 commit comments

Comments
 (0)