Skip to content

Commit c8926f8

Browse files
committed
Trigger connection recovery on socket write error
Automatic connection recovery triggers now by default when a write operation fails because of an IO exception. The recovery process takes place in a dedicated thread so the write operation doesn't wait (it receives the same IO exception immediatly). The test to trigger the error has changed: it doesn't use manual ack anymore, as this could sometimes block the broker and make recovery fail (broker was busy re-enqueuing messages). The test now sends a message in the consumer, which is enough to reproduce the error. Note the test against NIO is skipped right now, as it needs additional care. [#154263515] References #341
1 parent cea5e97 commit c8926f8

File tree

5 files changed

+113
-53
lines changed

5 files changed

+113
-53
lines changed

src/main/java/com/rabbitmq/client/impl/AMQConnection.java

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,7 @@ public AMQConnection(ConnectionParams params, FrameHandler frameHandler, Metrics
254254
this.errorOnWriteListener = params.getErrorOnWriteListener() != null ? params.getErrorOnWriteListener() :
255255
new ErrorOnWriteListener() {
256256
@Override
257-
public void handle(Connection connection, Throwable exception) { }
257+
public void handle(Connection connection, IOException exception) { }
258258
};
259259

260260
}
@@ -572,7 +572,6 @@ public void flush() throws IOException {
572572
_frameHandler.flush();
573573
} catch (IOException ioe) {
574574
this.errorOnWriteListener.handle(this, ioe);
575-
throw ioe;
576575
}
577576
}
578577

@@ -592,15 +591,24 @@ private class MainLoop implements Runnable {
592591
*/
593592
@Override
594593
public void run() {
594+
boolean shouldDoFinalShutdown = true;
595595
try {
596596
while (_running) {
597597
Frame frame = _frameHandler.readFrame();
598598
readFrame(frame);
599599
}
600600
} catch (Throwable ex) {
601-
handleFailure(ex);
601+
if (ex instanceof InterruptedException) {
602+
// loop has been interrupted during shutdown,
603+
// no need to do it again
604+
shouldDoFinalShutdown = false;
605+
} else {
606+
handleFailure(ex);
607+
}
602608
} finally {
603-
doFinalShutdown();
609+
if (shouldDoFinalShutdown) {
610+
doFinalShutdown();
611+
}
604612
}
605613
}
606614
}
@@ -706,6 +714,7 @@ public void doFinalShutdown() {
706714
if (finalShutdownStarted.compareAndSet(false, true)) {
707715
_frameHandler.close();
708716
_appContinuation.set(null);
717+
closeMainLoopThreadIfNecessary();
709718
notifyListeners();
710719
// assuming that shutdown listeners do not do anything
711720
// asynchronously, e.g. start new threads, this effectively
@@ -715,6 +724,22 @@ public void doFinalShutdown() {
715724
}
716725
}
717726

727+
private void closeMainLoopThreadIfNecessary() {
728+
if (mainLoopReadThreadNotNull() && notInMainLoopThread()) {
729+
if (this.mainLoopThread.isAlive()) {
730+
this.mainLoopThread.interrupt();
731+
}
732+
}
733+
}
734+
735+
private boolean notInMainLoopThread() {
736+
return Thread.currentThread() != this.mainLoopThread;
737+
}
738+
739+
private boolean mainLoopReadThreadNotNull() {
740+
return this.mainLoopThread != null;
741+
}
742+
718743
private void notifyRecoveryCanBeginListeners() {
719744
ShutdownSignalException sse = this.getCloseReason();
720745
for(RecoveryCanBeginListener fn : Utility.copy(this.recoveryCanBeginListeners)) {

src/main/java/com/rabbitmq/client/impl/ErrorOnWriteListener.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
import com.rabbitmq.client.Connection;
1919

20+
import java.io.IOException;
21+
2022
/**
2123
* Listener called when a connection gets an error trying to write on the socket.
2224
* This can be used to trigger connection recovery.
@@ -28,6 +30,6 @@ public interface ErrorOnWriteListener {
2830
* @param connection the owning connection instance
2931
* @param exception the thrown exception
3032
*/
31-
void handle(Connection connection, Throwable exception);
33+
void handle(Connection connection, IOException exception) throws IOException;
3234

3335
}

src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java

Lines changed: 41 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,18 @@
2222
import com.rabbitmq.client.impl.FrameHandlerFactory;
2323
import com.rabbitmq.client.impl.NetworkConnection;
2424
import com.rabbitmq.utility.Utility;
25+
import org.slf4j.Logger;
26+
import org.slf4j.LoggerFactory;
2527

2628
import java.io.IOException;
2729
import java.net.InetAddress;
2830
import java.util.*;
2931
import java.util.concurrent.ConcurrentHashMap;
3032
import java.util.concurrent.ExecutorService;
33+
import java.util.concurrent.ThreadFactory;
3134
import java.util.concurrent.TimeoutException;
35+
import java.util.concurrent.locks.Lock;
36+
import java.util.concurrent.locks.ReentrantLock;
3237

3338
/**
3439
* Connection implementation that performs automatic recovery when
@@ -52,6 +57,9 @@
5257
* @since 3.3.0
5358
*/
5459
public class AutorecoveringConnection implements RecoverableConnection, NetworkConnection {
60+
61+
private static final Logger LOGGER = LoggerFactory.getLogger(AutorecoveringConnection.class);
62+
5563
private final RecoveryAwareAMQConnectionFactory cf;
5664
private final Map<Integer, AutorecoveringChannel> channels;
5765
private final ConnectionParams params;
@@ -88,15 +96,37 @@ public AutorecoveringConnection(ConnectionParams params, FrameHandlerFactory f,
8896
this.cf = new RecoveryAwareAMQConnectionFactory(params, f, addressResolver, metricsCollector);
8997
this.params = params;
9098

99+
setupErrorOnWriteListenerForPotentialRecovery();
100+
101+
this.channels = new ConcurrentHashMap<Integer, AutorecoveringChannel>();
102+
}
103+
104+
private void setupErrorOnWriteListenerForPotentialRecovery() {
105+
final ThreadFactory threadFactory = this.params.getThreadFactory();
106+
final Lock errorOnWriteLock = new ReentrantLock();
91107
this.params.setErrorOnWriteListener(new ErrorOnWriteListener() {
92108
@Override
93-
public void handle(Connection connection, Throwable exception) {
94-
AMQConnection c = (AMQConnection) connection;
95-
c.handleIoError(exception);
109+
public void handle(final Connection connection, final IOException exception) throws IOException {
110+
// this is called for any write error
111+
// we should trigger the error handling and the recovery only once
112+
if (errorOnWriteLock.tryLock()) {
113+
try {
114+
Thread recoveryThread = threadFactory.newThread(new Runnable() {
115+
@Override
116+
public void run() {
117+
AMQConnection c = (AMQConnection) connection;
118+
c.handleIoError(exception);
119+
}
120+
});
121+
recoveryThread.setName("RabbitMQ Error On Write Thread");
122+
recoveryThread.start();
123+
} finally {
124+
errorOnWriteLock.unlock();
125+
}
126+
}
127+
throw exception;
96128
}
97129
});
98-
99-
this.channels = new ConcurrentHashMap<Integer, AutorecoveringChannel>();
100130
}
101131

102132
/**
@@ -661,7 +691,9 @@ private void recoverConsumers() {
661691
for (Map.Entry<String, RecordedConsumer> entry : Utility.copy(this.consumers).entrySet()) {
662692
String tag = entry.getKey();
663693
RecordedConsumer consumer = entry.getValue();
664-
694+
if (LOGGER.isDebugEnabled()) {
695+
LOGGER.debug("Recovering consumer {}", consumer);
696+
}
665697
try {
666698
String newTag = consumer.recover();
667699
// make sure server-generated tags are re-added. MK.
@@ -676,6 +708,9 @@ private void recoverConsumers() {
676708
for(ConsumerRecoveryListener crl : Utility.copy(this.consumerRecoveryListeners)) {
677709
crl.consumerRecovered(tag, newTag);
678710
}
711+
if (LOGGER.isDebugEnabled()) {
712+
LOGGER.debug("Consumer {} has recovered", consumer);
713+
}
679714
} catch (Exception cause) {
680715
final String message = "Caught an exception while recovering consumer " + tag +
681716
": " + cause.getMessage();

src/test/java/com/rabbitmq/client/test/NoAutoRecoveryWhenTcpWindowIsFullTest.java

Lines changed: 39 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.rabbitmq.client.Connection;
2121
import com.rabbitmq.client.ConnectionFactory;
2222
import com.rabbitmq.client.DefaultConsumer;
23+
import com.rabbitmq.client.DefaultSocketConfigurator;
2324
import com.rabbitmq.client.Envelope;
2425
import com.rabbitmq.client.Recoverable;
2526
import com.rabbitmq.client.RecoveryListener;
@@ -30,6 +31,7 @@
3031
import org.junit.Test;
3132

3233
import java.io.IOException;
34+
import java.net.Socket;
3335
import java.util.HashMap;
3436
import java.util.Map;
3537
import java.util.UUID;
@@ -45,68 +47,71 @@
4547
/**
4648
* Test to trigger and check the fix of https://github.com/rabbitmq/rabbitmq-java-client/issues/341.
4749
* Conditions:
48-
* - client registers consumer and a call QoS after
49-
* - client get many messages and the consumer is slow
50+
* - client registers consumer
51+
* - client get many messages as the consumer is slow
5052
* - the work pool queue is full, the reading thread is stuck
5153
* - more messages come from the network and saturates the TCP buffer
5254
* - the connection dies but the client doesn't detect it
53-
* - acks of messages fail
55+
* - sending in the consumer fails
5456
* - connection recovery is never triggered
5557
* <p>
5658
* The fix consists in triggering connection recovery when writing
57-
* to the socket fails. As the socket is dead, the closing
58-
* sequence can take some time, hence the setup of the shutdown
59-
* listener, which avoids waiting for the socket termination by
60-
* the OS (can take 15 minutes on linux).
59+
* to the socket fails.
6160
*/
6261
public class NoAutoRecoveryWhenTcpWindowIsFullTest {
6362

64-
private static final int QOS_PREFETCH = 64;
65-
private static final int NUM_MESSAGES_TO_PRODUCE = 100000;
63+
private static final int NUM_MESSAGES_TO_PRODUCE = 50000;
6664
private static final int MESSAGE_PROCESSING_TIME_MS = 3000;
65+
private static final byte[] MESSAGE_CONTENT = ("MESSAGE CONTENT " + NUM_MESSAGES_TO_PRODUCE).getBytes();
6766

68-
private ExecutorService dispatchingService;
69-
private ExecutorService producerService;
70-
private ExecutorService shutdownService;
67+
private ExecutorService executorService;
7168
private AutorecoveringConnection producingConnection;
7269
private AutorecoveringChannel producingChannel;
7370
private AutorecoveringConnection consumingConnection;
7471
private AutorecoveringChannel consumingChannel;
7572

76-
private CountDownLatch consumerRecoverOkLatch;
73+
private CountDownLatch consumerOkLatch;
7774

7875
@Before
7976
public void setUp() throws Exception {
80-
dispatchingService = Executors.newSingleThreadExecutor();
81-
shutdownService = Executors.newSingleThreadExecutor();
82-
producerService = Executors.newSingleThreadExecutor();
77+
// we need several threads to publish, dispatch deliveries, handle RPC responses, etc.
78+
executorService = Executors.newFixedThreadPool(10);
8379
final ConnectionFactory factory = TestUtils.connectionFactory();
80+
factory.setSocketConfigurator(new DefaultSocketConfigurator() {
81+
82+
int DEFAULT_RECEIVE_BUFFER_SIZE = 43690;
83+
84+
@Override
85+
public void configure(Socket socket) throws IOException {
86+
super.configure(socket);
87+
socket.setReceiveBufferSize(DEFAULT_RECEIVE_BUFFER_SIZE);
88+
}
89+
});
8490
factory.setAutomaticRecoveryEnabled(true);
8591
factory.setTopologyRecoveryEnabled(true);
8692
// we try to set the lower values for closing timeouts, etc.
8793
// this makes the test execute faster.
88-
factory.setShutdownExecutor(shutdownService);
89-
factory.setShutdownTimeout(10000);
9094
factory.setRequestedHeartbeat(5);
91-
factory.setSharedExecutor(dispatchingService);
92-
factory.setNetworkRecoveryInterval(1000);
95+
factory.setSharedExecutor(executorService);
96+
// we need the shutdown executor: channel shutting down depends on the work pool,
97+
// which is full. Channel shutting down will time out with the shutdown executor
98+
factory.setShutdownExecutor(executorService);
99+
factory.setNetworkRecoveryInterval(2000);
93100

94101
producingConnection = (AutorecoveringConnection) factory.newConnection("Producer Connection");
95102
producingChannel = (AutorecoveringChannel) producingConnection.createChannel();
96103
consumingConnection = (AutorecoveringConnection) factory.newConnection("Consuming Connection");
97104
consumingChannel = (AutorecoveringChannel) consumingConnection.createChannel();
98105

99-
consumerRecoverOkLatch = new CountDownLatch(1);
106+
consumerOkLatch = new CountDownLatch(2);
100107
}
101108

102109
@After
103110
public void tearDown() throws IOException {
104111
closeConnectionIfOpen(consumingConnection);
105112
closeConnectionIfOpen(producingConnection);
106113

107-
dispatchingService.shutdownNow();
108-
producerService.shutdownNow();
109-
shutdownService.shutdownNow();
114+
executorService.shutdownNow();
110115
}
111116

112117
@Test
@@ -116,17 +121,17 @@ public void failureAndRecovery() throws IOException, InterruptedException {
116121
}
117122
final String queue = UUID.randomUUID().toString();
118123

119-
final CountDownLatch latch = new CountDownLatch(1);
124+
final CountDownLatch recoveryLatch = new CountDownLatch(1);
120125

121126
consumingConnection.addRecoveryListener(new RecoveryListener() {
122127

123128
@Override
124129
public void handleRecovery(Recoverable recoverable) {
130+
recoveryLatch.countDown();
125131
}
126132

127133
@Override
128134
public void handleRecoveryStarted(Recoverable recoverable) {
129-
latch.countDown();
130135
}
131136
});
132137

@@ -136,12 +141,12 @@ public void handleRecoveryStarted(Recoverable recoverable) {
136141

137142
assertThat(
138143
"Connection should have been closed and should have recovered by now",
139-
latch.await(60, TimeUnit.SECONDS), is(true)
144+
recoveryLatch.await(60, TimeUnit.SECONDS), is(true)
140145
);
141146

142147
assertThat(
143148
"Consumer should have recovered by now",
144-
latch.await(5, TimeUnit.SECONDS), is(true)
149+
consumerOkLatch.await(5, TimeUnit.SECONDS), is(true)
145150
);
146151
}
147152

@@ -158,12 +163,12 @@ private void declareQueue(final Channel channel, final String queue) throws IOEx
158163

159164
private void produceMessagesInBackground(final Channel channel, final String queue) throws IOException {
160165
final AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().deliveryMode(1).build();
161-
producerService.submit(new Callable<Void>() {
166+
executorService.submit(new Callable<Void>() {
162167

163168
@Override
164169
public Void call() throws Exception {
165170
for (int i = 0; i < NUM_MESSAGES_TO_PRODUCE; i++) {
166-
channel.basicPublish("", queue, false, properties, ("MSG NUM" + i).getBytes());
171+
channel.basicPublish("", queue, false, properties, MESSAGE_CONTENT);
167172
}
168173
closeConnectionIfOpen(producingConnection);
169174
return null;
@@ -172,36 +177,29 @@ public Void call() throws Exception {
172177
}
173178

174179
private void startConsumer(final String queue) throws IOException {
175-
consumingChannel.basicConsume(queue, false, "", false, false, null, new DefaultConsumer(consumingChannel) {
180+
consumingChannel.basicConsume(queue, true, new DefaultConsumer(consumingChannel) {
176181

177182
@Override
178-
public void handleRecoverOk(String consumerTag) {
179-
consumerRecoverOkLatch.countDown();
183+
public void handleConsumeOk(String consumerTag) {
184+
consumerOkLatch.countDown();
180185
}
181186

182187
@Override
183188
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
184189
consumerWork();
185190
try {
186-
consumingChannel.basicAck(envelope.getDeliveryTag(), false);
191+
consumingChannel.basicPublish("", "", null, "".getBytes());
187192
} catch (Exception e) {
188193
// application should handle writing exceptions
189194
}
190195
}
191196
});
192-
try {
193-
Thread.sleep(500);
194-
} catch (InterruptedException e) {
195-
e.printStackTrace();
196-
}
197-
consumingChannel.basicQos(QOS_PREFETCH);
198197
}
199198

200199
private void consumerWork() {
201200
try {
202201
Thread.sleep(MESSAGE_PROCESSING_TIME_MS);
203202
} catch (InterruptedException e) {
204-
e.printStackTrace();
205203
}
206204
}
207205
}

src/test/resources/logback-test.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
</encoder>
66
</appender>
77

8-
<root level="info">
8+
<root level="warn">
99
<appender-ref ref="STDOUT" />
1010
</root>
1111
</configuration>

0 commit comments

Comments
 (0)