Skip to content

Commit b4f1f1c

Browse files
committed
Close asynchronously if called in NIO loop thread
References #1083 (cherry picked from commit 8d853c8) Conflicts: src/test/java/com/rabbitmq/client/test/ClientTestSuite.java
1 parent 9bbb35d commit b4f1f1c

File tree

4 files changed

+76
-2
lines changed

4 files changed

+76
-2
lines changed

src/main/java/com/rabbitmq/client/impl/AMQConnection.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,8 @@ public class AMQConnection extends ShutdownNotifierComponent implements Connecti
5858
private final ScheduledExecutorService heartbeatExecutor;
5959
private final ExecutorService shutdownExecutor;
6060
private Thread mainLoopThread;
61+
private final AtomicBoolean ioLoopThreadSet = new AtomicBoolean(false);
62+
private volatile Thread ioLoopThread;
6163
private ThreadFactory threadFactory = Executors.defaultThreadFactory();
6264
private String id;
6365

@@ -513,6 +515,7 @@ public void startMainLoop() {
513515
MainLoop loop = new MainLoop();
514516
final String name = "AMQP Connection " + getHostAddress() + ":" + getPort();
515517
mainLoopThread = Environment.newThread(threadFactory, loop, name);
518+
ioLoopThread(mainLoopThread);
516519
mainLoopThread.start();
517520
}
518521

@@ -1112,7 +1115,7 @@ public void close(int closeCode,
11121115
boolean abort)
11131116
throws IOException
11141117
{
1115-
boolean sync = !(Thread.currentThread() == mainLoopThread);
1118+
boolean sync = !(Thread.currentThread() == ioLoopThread);
11161119

11171120
try {
11181121
AMQP.Connection.Close reason =
@@ -1206,6 +1209,12 @@ public void setId(String id) {
12061209
this.id = id;
12071210
}
12081211

1212+
public void ioLoopThread(Thread thread) {
1213+
if (this.ioLoopThreadSet.compareAndSet(false, true)) {
1214+
this.ioLoopThread = thread;
1215+
}
1216+
}
1217+
12091218
public int getChannelRpcTimeout() {
12101219
return channelRpcTimeout;
12111220
}

src/main/java/com/rabbitmq/client/impl/nio/NioLoop.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,7 @@ public void run() {
157157

158158
if (frame != null) {
159159
try {
160+
state.getConnection().ioLoopThread(Thread.currentThread());
160161
boolean noProblem = state.getConnection().handleReadFrame(frame);
161162
if (noProblem && (!state.getConnection().isRunning() || state.getConnection().hasBrokerInitiatedShutdown())) {
162163
// looks like the frame was Close-Ok or Close
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
// Copyright (c) 2023 VMware, Inc. or its affiliates. All rights reserved.
2+
//
3+
// This software, the RabbitMQ Java client library, is triple-licensed under the
4+
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
5+
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
6+
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
7+
// please see LICENSE-APACHE2.
8+
//
9+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
10+
// either express or implied. See the LICENSE file for specific language governing
11+
// rights and limitations of this software.
12+
//
13+
// If you have any questions regarding licensing, please contact us at
14+
// info@rabbitmq.com.
15+
16+
package com.rabbitmq.client.test;
17+
18+
import static com.rabbitmq.client.test.TestUtils.LatchConditions.completed;
19+
import static com.rabbitmq.client.test.TestUtils.waitAtMost;
20+
import static org.assertj.core.api.Assertions.assertThat;
21+
22+
import com.rabbitmq.client.Channel;
23+
import com.rabbitmq.client.Connection;
24+
import com.rabbitmq.client.ConnectionFactory;
25+
import java.util.concurrent.CountDownLatch;
26+
import org.junit.jupiter.params.ParameterizedTest;
27+
import org.junit.jupiter.params.provider.ValueSource;
28+
29+
public class BlockedConnectionTest extends BrokerTestCase {
30+
31+
@ParameterizedTest
32+
@ValueSource(booleans = {true, false})
33+
void errorInBlockListenerShouldCloseConnection(boolean nio) throws Exception {
34+
ConnectionFactory cf = TestUtils.connectionFactory();
35+
if (nio) {
36+
cf.useNio();
37+
} else {
38+
cf.useBlockingIo();
39+
}
40+
Connection c = cf.newConnection();
41+
CountDownLatch shutdownLatch = new CountDownLatch(1);
42+
c.addShutdownListener(cause -> shutdownLatch.countDown());
43+
CountDownLatch blockedLatch = new CountDownLatch(1);
44+
c.addBlockedListener(
45+
reason -> {
46+
blockedLatch.countDown();
47+
throw new RuntimeException("error in blocked listener!");
48+
},
49+
() -> {});
50+
try {
51+
block();
52+
Channel ch = c.createChannel();
53+
ch.basicPublish("", "", null, "dummy".getBytes());
54+
assertThat(blockedLatch).is(completed());
55+
} finally {
56+
unblock();
57+
}
58+
assertThat(shutdownLatch).is(completed());
59+
waitAtMost(() -> !c.isOpen());
60+
}
61+
62+
}

src/test/java/com/rabbitmq/client/test/ClientTestSuite.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,9 @@
8181
DefaultCredentialsRefreshServiceTest.class,
8282
OAuth2ClientCredentialsGrantCredentialsProviderTest.class,
8383
RefreshCredentialsTest.class,
84-
AMQConnectionRefreshCredentialsTest.class
84+
AMQConnectionRefreshCredentialsTest.class,
85+
BlockedConnectionTest.class,
86+
ValueWriterTest.class
8587
})
8688
public class ClientTestSuite {
8789

0 commit comments

Comments
 (0)