Skip to content

Commit ed69420

Browse files
committed
Enable automatic recovery by default (WIP)
Fixes #210
1 parent 35bdc03 commit ed69420

File tree

7 files changed

+35
-41
lines changed

7 files changed

+35
-41
lines changed

src/main/java/com/rabbitmq/client/ConnectionFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ public class ConnectionFactory implements Cloneable {
100100
private SocketConfigurator socketConf = new DefaultSocketConfigurator();
101101
private ExceptionHandler exceptionHandler = new DefaultExceptionHandler();
102102

103-
private boolean automaticRecovery = false;
103+
private boolean automaticRecovery = true;
104104
private boolean topologyRecovery = true;
105105

106106
// long is used to make sure the users can use both ints

src/main/java/com/rabbitmq/client/impl/recovery/RecoveryAwareChannelN.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,16 +82,20 @@ private AMQImpl.Basic.Deliver offsetDeliveryTag(AMQImpl.Basic.Deliver method) {
8282

8383
@Override
8484
public void basicAck(long deliveryTag, boolean multiple) throws IOException {
85+
// FIXME no check if deliveryTag = 0 (ack all)
8586
long realTag = deliveryTag - activeDeliveryTagOffset;
86-
if (realTag > 0) {
87+
// 0 tag means ack all
88+
if (realTag >= 0) {
8789
super.basicAck(realTag, multiple);
8890
}
8991
}
9092

9193
@Override
9294
public void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException {
95+
// FIXME no check if deliveryTag = 0 (nack all)
9396
long realTag = deliveryTag - activeDeliveryTagOffset;
94-
if (realTag > 0) {
97+
// 0 tag means nack all
98+
if (realTag >= 0) {
9599
super.basicNack(realTag, multiple, requeue);
96100
}
97101
}

src/test/java/com/rabbitmq/client/test/SharedThreadPoolTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
public class SharedThreadPoolTest extends BrokerTestCase {
3232
@Test public void willShutDownExecutor() throws IOException, TimeoutException {
3333
ConnectionFactory cf = TestUtils.connectionFactory();
34+
cf.setAutomaticRecoveryEnabled(false);
3435
ExecutorService executor = Executors.newFixedThreadPool(8);
3536
cf.setSharedExecutor(executor);
3637

src/test/java/com/rabbitmq/client/test/functional/Heartbeat.java

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,13 @@
1616

1717
package com.rabbitmq.client.test.functional;
1818

19-
import static org.junit.Assert.assertEquals;
20-
import static org.junit.Assert.assertFalse;
21-
import static org.junit.Assert.assertTrue;
19+
import com.rabbitmq.client.impl.recovery.AutorecoveringConnection;
20+
import com.rabbitmq.client.test.BrokerTestCase;
21+
import org.junit.Test;
2222

2323
import java.io.IOException;
2424

25-
import org.junit.Test;
26-
27-
import com.rabbitmq.client.impl.AMQConnection;
28-
import com.rabbitmq.client.test.BrokerTestCase;
25+
import static org.junit.Assert.*;
2926

3027
public class Heartbeat extends BrokerTestCase {
3128

@@ -41,7 +38,7 @@ public Heartbeat()
4138
assertEquals(1, connection.getHeartbeat());
4239
Thread.sleep(3100);
4340
assertTrue(connection.isOpen());
44-
((AMQConnection)connection).setHeartbeat(0);
41+
((AutorecoveringConnection)connection).getDelegate().setHeartbeat(0);
4542
assertEquals(0, connection.getHeartbeat());
4643
Thread.sleep(3100);
4744
assertFalse(connection.isOpen());

src/test/java/com/rabbitmq/client/test/functional/UnexpectedFrames.java

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,19 +15,18 @@
1515

1616
package com.rabbitmq.client.test.functional;
1717

18-
import java.io.IOException;
19-
import java.net.Socket;
20-
21-
import javax.net.SocketFactory;
22-
23-
import com.rabbitmq.client.impl.*;
24-
import com.rabbitmq.client.test.TestUtils;
25-
import org.junit.Test;
26-
2718
import com.rabbitmq.client.AMQP;
2819
import com.rabbitmq.client.ConnectionFactory;
2920
import com.rabbitmq.client.DefaultSocketConfigurator;
21+
import com.rabbitmq.client.impl.*;
22+
import com.rabbitmq.client.impl.recovery.AutorecoveringConnection;
3023
import com.rabbitmq.client.test.BrokerTestCase;
24+
import com.rabbitmq.client.test.TestUtils;
25+
import org.junit.Test;
26+
27+
import javax.net.SocketFactory;
28+
import java.io.IOException;
29+
import java.net.Socket;
3130

3231
/**
3332
* Test that the server correctly handles us when we send it bad frames
@@ -177,7 +176,7 @@ public Frame confuse(Frame frame) {
177176
}
178177

179178
private void expectError(int error, Confuser confuser) throws IOException {
180-
((ConfusedFrameHandler)((AMQConnection)connection).getFrameHandler()).
179+
((ConfusedFrameHandler)((AutorecoveringConnection)connection).getDelegate().getFrameHandler()).
181180
confuser = confuser;
182181

183182
//NB: the frame confuser relies on the encoding of the

src/test/java/com/rabbitmq/client/test/server/ChannelLimitNegotiation.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
import javax.net.SocketFactory;
2727

28+
import com.rabbitmq.client.impl.recovery.AutorecoveringConnection;
2829
import com.rabbitmq.client.test.TestUtils;
2930
import org.junit.Test;
3031

@@ -103,7 +104,7 @@ protected int negotiateChannelMax(int requestedChannelMax, int serverMax) {
103104
assertNull(conn.createChannel(n + 1));
104105

105106
// Construct a channel directly
106-
final ChannelN ch = new ChannelN((AMQConnection) conn, n + 1,
107+
final ChannelN ch = new ChannelN(((AutorecoveringConnection) conn).getDelegate(), n + 1,
107108
new ConsumerWorkService(Executors.newSingleThreadExecutor(),
108109
Executors.defaultThreadFactory(), ConnectionFactory.DEFAULT_SHUTDOWN_TIMEOUT));
109110
conn.addShutdownListener(new ShutdownListener() {

src/test/java/com/rabbitmq/client/test/server/Permissions.java

Lines changed: 11 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -16,28 +16,20 @@
1616

1717
package com.rabbitmq.client.test.server;
1818

19-
import static org.junit.Assert.assertFalse;
20-
import static org.junit.Assert.assertTrue;
21-
import static org.junit.Assert.fail;
19+
import com.rabbitmq.client.*;
20+
import com.rabbitmq.client.impl.AMQChannel;
21+
import com.rabbitmq.client.impl.recovery.AutorecoveringChannel;
22+
import com.rabbitmq.client.test.BrokerTestCase;
23+
import com.rabbitmq.client.test.TestUtils;
24+
import com.rabbitmq.tools.Host;
25+
import org.junit.Test;
2226

2327
import java.io.IOException;
2428
import java.util.HashMap;
2529
import java.util.Map;
2630
import java.util.concurrent.TimeoutException;
2731

28-
import com.rabbitmq.client.test.TestUtils;
29-
import org.junit.Test;
30-
31-
import com.rabbitmq.client.AMQP;
32-
import com.rabbitmq.client.AlreadyClosedException;
33-
import com.rabbitmq.client.AuthenticationFailureException;
34-
import com.rabbitmq.client.Channel;
35-
import com.rabbitmq.client.Connection;
36-
import com.rabbitmq.client.ConnectionFactory;
37-
import com.rabbitmq.client.QueueingConsumer;
38-
import com.rabbitmq.client.impl.AMQChannel;
39-
import com.rabbitmq.client.test.BrokerTestCase;
40-
import com.rabbitmq.tools.Host;
32+
import static org.junit.Assert.*;
4133

4234
public class Permissions extends BrokerTestCase
4335
{
@@ -222,9 +214,9 @@ public void with(String name) throws IOException {
222214
{
223215
runTest(false, false, true, false, new WithName() {
224216
public void with(String name) throws IOException {
225-
((AMQChannel)channel)
226-
.exnWrappingRpc(new AMQP.Queue.Purge.Builder()
227-
.queue(name)
217+
AMQChannel channelDelegate = (AMQChannel) ((AutorecoveringChannel)channel).getDelegate();
218+
channelDelegate.exnWrappingRpc(new AMQP.Queue.Purge.Builder()
219+
.queue(name)
228220
.build());
229221
}});
230222
}

0 commit comments

Comments
 (0)