Skip to content

Commit e324bf9

Browse files
committed
Trigger recovery if a socket write fails
Detecting connection failure on reading can take a lot of time (even forever if the reading thread is stuck), so connection recovery can now be triggered when a write operation fails. This can make the client more reactive to detect failing connections. References #341
1 parent fe5b196 commit e324bf9

File tree

5 files changed

+265
-2
lines changed

5 files changed

+265
-2
lines changed

src/main/java/com/rabbitmq/client/impl/AMQConnection.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ public class AMQConnection extends ShutdownNotifierComponent implements Connecti
6161
private final List<RecoveryCanBeginListener> recoveryCanBeginListeners =
6262
Collections.synchronizedList(new ArrayList<RecoveryCanBeginListener>());
6363

64+
private final ErrorOnWriteListener errorOnWriteListener;
65+
6466
/**
6567
* Retrieve a copy of the default table of client properties that
6668
* will be sent to the server during connection startup. This
@@ -245,6 +247,13 @@ public AMQConnection(ConnectionParams params, FrameHandler frameHandler, Metrics
245247
this._inConnectionNegotiation = true; // we start out waiting for the first protocol response
246248

247249
this.metricsCollector = metricsCollector;
250+
251+
this.errorOnWriteListener = params.getErrorOnWriteListener() != null ? params.getErrorOnWriteListener() :
252+
new ErrorOnWriteListener() {
253+
@Override
254+
public void handle(Connection connection, Throwable exception) { }
255+
};
256+
248257
}
249258

250259
private void initializeConsumerWorkService() {
@@ -556,7 +565,11 @@ public void writeFrame(Frame f) throws IOException {
556565
* Public API - flush the output buffers
557566
*/
558567
public void flush() throws IOException {
559-
_frameHandler.flush();
568+
try {
569+
_frameHandler.flush();
570+
} catch (Throwable throwable) {
571+
this.errorOnWriteListener.handle(this, throwable);
572+
}
560573
}
561574

562575
private static int negotiatedMaxValue(int clientValue, int serverValue) {
@@ -879,7 +892,6 @@ private ShutdownSignalException startShutdown(Method reason,
879892
_heartbeatSender.shutdown();
880893

881894
_channel0.processShutdownSignal(sse, !initiatedByApplication, notifyRpc);
882-
883895
return sse;
884896
}
885897

src/main/java/com/rabbitmq/client/impl/ConnectionParams.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ public class ConnectionParams {
4444
private boolean topologyRecovery;
4545
private int channelRpcTimeout;
4646
private boolean channelShouldCheckRpcResponseType;
47+
private ErrorOnWriteListener errorOnWriteListener;
4748

4849
private ExceptionHandler exceptionHandler;
4950
private ThreadFactory threadFactory;
@@ -213,4 +214,12 @@ public void setChannelRpcTimeout(int channelRpcTimeout) {
213214
public void setChannelShouldCheckRpcResponseType(boolean channelShouldCheckRpcResponseType) {
214215
this.channelShouldCheckRpcResponseType = channelShouldCheckRpcResponseType;
215216
}
217+
218+
public void setErrorOnWriteListener(ErrorOnWriteListener errorOnWriteListener) {
219+
this.errorOnWriteListener = errorOnWriteListener;
220+
}
221+
222+
public ErrorOnWriteListener getErrorOnWriteListener() {
223+
return errorOnWriteListener;
224+
}
216225
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
// Copyright (c) 2018-Present Pivotal Software, Inc. All rights reserved.
2+
//
3+
// This software, the RabbitMQ Java client library, is triple-licensed under the
4+
// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
5+
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
6+
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
7+
// please see LICENSE-APACHE2.
8+
//
9+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
10+
// either express or implied. See the LICENSE file for specific language governing
11+
// rights and limitations of this software.
12+
//
13+
// If you have any questions regarding licensing, please contact us at
14+
// info@rabbitmq.com.
15+
16+
package com.rabbitmq.client.impl;
17+
18+
import com.rabbitmq.client.Connection;
19+
20+
/**
21+
* Listener called when a connection gets an error trying to write on the socket.
22+
* This can be used to trigger connection recovery.
23+
*/
24+
public interface ErrorOnWriteListener {
25+
26+
/**
27+
* Called when writing to the socket failed
28+
* @param connection the owning connection instance
29+
* @param exception the thrown exception
30+
*/
31+
void handle(Connection connection, Throwable exception);
32+
33+
}

src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import com.rabbitmq.client.*;
1919
import com.rabbitmq.client.impl.AMQConnection;
2020
import com.rabbitmq.client.impl.ConnectionParams;
21+
import com.rabbitmq.client.impl.ErrorOnWriteListener;
2122
import com.rabbitmq.client.impl.FrameHandlerFactory;
2223
import com.rabbitmq.client.impl.NetworkConnection;
2324
import com.rabbitmq.utility.Utility;
@@ -87,6 +88,14 @@ public AutorecoveringConnection(ConnectionParams params, FrameHandlerFactory f,
8788
this.cf = new RecoveryAwareAMQConnectionFactory(params, f, addressResolver, metricsCollector);
8889
this.params = params;
8990

91+
this.params.setErrorOnWriteListener(new ErrorOnWriteListener() {
92+
@Override
93+
public void handle(Connection connection, Throwable exception) {
94+
AMQConnection c = (AMQConnection) connection;
95+
c.handleIoError(exception);
96+
}
97+
});
98+
9099
this.channels = new ConcurrentHashMap<Integer, AutorecoveringChannel>();
91100
}
92101

Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,200 @@
1+
// Copyright (c) 2018-Present Pivotal Software, Inc. All rights reserved.
2+
//
3+
// This software, the RabbitMQ Java client library, is triple-licensed under the
4+
// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
5+
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
6+
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
7+
// please see LICENSE-APACHE2.
8+
//
9+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
10+
// either express or implied. See the LICENSE file for specific language governing
11+
// rights and limitations of this software.
12+
//
13+
// If you have any questions regarding licensing, please contact us at
14+
// info@rabbitmq.com.
15+
16+
package com.rabbitmq.client.test;
17+
18+
import com.rabbitmq.client.AMQP;
19+
import com.rabbitmq.client.Channel;
20+
import com.rabbitmq.client.Connection;
21+
import com.rabbitmq.client.ConnectionFactory;
22+
import com.rabbitmq.client.DefaultConsumer;
23+
import com.rabbitmq.client.Envelope;
24+
import com.rabbitmq.client.Recoverable;
25+
import com.rabbitmq.client.RecoveryListener;
26+
import com.rabbitmq.client.impl.recovery.AutorecoveringChannel;
27+
import com.rabbitmq.client.impl.recovery.AutorecoveringConnection;
28+
import org.junit.After;
29+
import org.junit.Before;
30+
import org.junit.Test;
31+
32+
import java.io.IOException;
33+
import java.util.HashMap;
34+
import java.util.Map;
35+
import java.util.UUID;
36+
import java.util.concurrent.Callable;
37+
import java.util.concurrent.CountDownLatch;
38+
import java.util.concurrent.ExecutorService;
39+
import java.util.concurrent.Executors;
40+
import java.util.concurrent.TimeUnit;
41+
42+
import static org.hamcrest.Matchers.is;
43+
import static org.junit.Assert.assertThat;
44+
45+
/**
46+
* Test to trigger and check the fix of https://github.com/rabbitmq/rabbitmq-java-client/issues/341.
47+
* Conditions:
48+
* - client registers consumer and a call QoS after
49+
* - client get many messages and the consumer is slow
50+
* - the work pool queue is full, the reading thread is stuck
51+
* - more messages come from the network and saturates the TCP buffer
52+
* - the connection dies but the client doesn't detect it
53+
* - acks of messages fail
54+
* - connection recovery is never triggered
55+
*
56+
* The fix consists in triggering connection recovery when writing
57+
* to the socket fails. As the socket is dead, the closing
58+
* sequence can take some time, hence the setup of the shutdown
59+
* listener, which avoids waiting for the socket termination by
60+
* the OS (can take 15 minutes on linux).
61+
*/
62+
public class NoAutoRecoveryWhenTcpWindowIsFullTest {
63+
64+
private static final int QOS_PREFETCH = 64;
65+
private static final int NUM_MESSAGES_TO_PRODUCE = 100000;
66+
private static final int MESSAGE_PROCESSING_TIME_MS = 3000;
67+
68+
private ExecutorService dispatchingService;
69+
private ExecutorService producerService;
70+
private ExecutorService shutdownService;
71+
private AutorecoveringConnection producingConnection;
72+
private AutorecoveringChannel producingChannel;
73+
private AutorecoveringConnection consumingConnection;
74+
private AutorecoveringChannel consumingChannel;
75+
76+
private CountDownLatch consumerRecoverOkLatch;
77+
78+
@Before
79+
public void setUp() throws Exception {
80+
dispatchingService = Executors.newSingleThreadExecutor();
81+
shutdownService = Executors.newSingleThreadExecutor();
82+
producerService = Executors.newSingleThreadExecutor();
83+
final ConnectionFactory factory = TestUtils.connectionFactory();
84+
factory.setAutomaticRecoveryEnabled(true);
85+
factory.setTopologyRecoveryEnabled(true);
86+
// we try to set the lower values for closing timeouts, etc.
87+
// this makes the test execute faster.
88+
factory.setShutdownExecutor(shutdownService);
89+
factory.setShutdownTimeout(10000);
90+
factory.setRequestedHeartbeat(5);
91+
factory.setShutdownExecutor(dispatchingService);
92+
factory.setNetworkRecoveryInterval(1000);
93+
94+
producingConnection = (AutorecoveringConnection) factory.newConnection("Producer Connection");
95+
producingChannel = (AutorecoveringChannel) producingConnection.createChannel();
96+
consumingConnection = (AutorecoveringConnection) factory.newConnection("Consuming Connection");
97+
consumingChannel = (AutorecoveringChannel) consumingConnection.createChannel();
98+
99+
consumerRecoverOkLatch = new CountDownLatch(1);
100+
}
101+
102+
@After
103+
public void tearDown() throws IOException {
104+
dispatchingService.shutdownNow();
105+
producerService.shutdownNow();
106+
shutdownService.shutdownNow();
107+
closeConnectionIfOpen(consumingConnection);
108+
closeConnectionIfOpen(producingConnection);
109+
}
110+
111+
@Test
112+
public void failureAndRecovery() throws IOException, InterruptedException {
113+
final String queue = UUID.randomUUID().toString();
114+
115+
final CountDownLatch latch = new CountDownLatch(1);
116+
117+
consumingConnection.addRecoveryListener(new RecoveryListener() {
118+
119+
@Override
120+
public void handleRecovery(Recoverable recoverable) {
121+
}
122+
123+
@Override
124+
public void handleRecoveryStarted(Recoverable recoverable) {
125+
latch.countDown();
126+
}
127+
});
128+
129+
declareQueue(producingChannel, queue);
130+
produceMessagesInBackground(producingChannel, queue);
131+
startConsumer(queue);
132+
133+
assertThat(
134+
"Connection should have been closed and should have recovered by now",
135+
latch.await(60, TimeUnit.SECONDS), is(true)
136+
);
137+
138+
assertThat(
139+
"Consumer should have recovered by now",
140+
latch.await(5, TimeUnit.SECONDS), is(true)
141+
);
142+
}
143+
144+
private void closeConnectionIfOpen(Connection connection) throws IOException {
145+
if (connection.isOpen()) {
146+
connection.close();
147+
}
148+
}
149+
150+
private void declareQueue(final Channel channel, final String queue) throws IOException {
151+
final Map<String, Object> queueArguments = new HashMap<String, Object>();
152+
channel.queueDeclare(queue, false, false, false, queueArguments);
153+
}
154+
155+
private void produceMessagesInBackground(final Channel channel, final String queue) {
156+
final AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().deliveryMode(1).build();
157+
producerService.submit(new Callable<Void>() {
158+
159+
@Override
160+
public Void call() throws Exception {
161+
for (int i = 0; i < NUM_MESSAGES_TO_PRODUCE; i++) {
162+
channel.basicPublish("", queue, false, properties, ("MSG NUM" + i).getBytes());
163+
}
164+
closeConnectionIfOpen(producingConnection);
165+
return null;
166+
}
167+
});
168+
}
169+
170+
private void startConsumer(final String queue) throws IOException {
171+
consumingChannel.basicConsume(queue, false, "", false, false, null, new DefaultConsumer(consumingChannel) {
172+
173+
@Override
174+
public void handleRecoverOk(String consumerTag) {
175+
consumerRecoverOkLatch.countDown();
176+
}
177+
178+
@Override
179+
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
180+
consumerWork();
181+
consumingChannel.basicAck(envelope.getDeliveryTag(), false);
182+
}
183+
});
184+
try {
185+
Thread.sleep(500);
186+
} catch (InterruptedException e) {
187+
e.printStackTrace();
188+
}
189+
consumingChannel.basicQos(QOS_PREFETCH);
190+
}
191+
192+
private void consumerWork() {
193+
try {
194+
Thread.sleep(MESSAGE_PROCESSING_TIME_MS);
195+
} catch (InterruptedException e) {
196+
e.printStackTrace();
197+
}
198+
}
199+
}
200+

0 commit comments

Comments
 (0)