Skip to content

Commit d975201

Browse files
committed
Add configurable timeout on RPC calls
Fixes #219
1 parent ee3aeac commit d975201

File tree

9 files changed

+382
-11
lines changed

9 files changed

+382
-11
lines changed
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package com.rabbitmq.client;
2+
3+
import java.io.IOException;
4+
import java.util.concurrent.TimeoutException;
5+
6+
/**
7+
*
8+
*/
9+
public class ChannelRpcTimeoutException extends IOException {
10+
11+
private final Object channel;
12+
13+
private final Method method;
14+
15+
public ChannelRpcTimeoutException(TimeoutException cause, Object channel, Method method) {
16+
super(cause);
17+
this.channel = channel;
18+
this.method = method;
19+
}
20+
21+
public Method getMethod() {
22+
return method;
23+
}
24+
25+
public Object getChannel() {
26+
return channel;
27+
}
28+
}

src/main/java/com/rabbitmq/client/ConnectionFactory.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,9 @@ public class ConnectionFactory implements Cloneable {
7373
* zero means wait indefinitely */
7474
public static final int DEFAULT_SHUTDOWN_TIMEOUT = 10000;
7575

76+
/** The default timeout for RPC calls in channels: no timeout */
77+
public static final int DEFAULT_CHANNEL_RPC_TIMEOUT = -1;
78+
7679
private static final String PREFERRED_TLS_PROTOCOL = "TLSv1.2";
7780

7881
private static final String FALLBACK_TLS_PROTOCOL = "TLSv1";
@@ -116,6 +119,8 @@ public class ConnectionFactory implements Cloneable {
116119

117120
private SSLContext sslContext;
118121

122+
private int channelRpcTimeout = DEFAULT_CHANNEL_RPC_TIMEOUT;
123+
119124
/** @return the default host to use for connections */
120125
public String getHost() {
121126
return host;
@@ -937,6 +942,7 @@ public ConnectionParams params(ExecutorService consumerWorkServiceExecutor) {
937942
result.setRequestedHeartbeat(requestedHeartbeat);
938943
result.setShutdownExecutor(shutdownExecutor);
939944
result.setHeartbeatExecutor(heartbeatExecutor);
945+
result.setChannelRpcTimeout(channelRpcTimeout);
940946
return result;
941947
}
942948

@@ -1079,4 +1085,21 @@ public void useNio() {
10791085
public void useBlockingIo() {
10801086
this.nio = false;
10811087
}
1088+
1089+
/**
1090+
* Set the timeout for RPC calls in channels.
1091+
* Default is no timeout.
1092+
* @param channelRpcTimeout
1093+
*/
1094+
public void setChannelRpcTimeout(int channelRpcTimeout) {
1095+
this.channelRpcTimeout = channelRpcTimeout;
1096+
}
1097+
1098+
/**
1099+
* Get the timeout for RPC calls in channels.
1100+
* @return
1101+
*/
1102+
public int getChannelRpcTimeout() {
1103+
return channelRpcTimeout;
1104+
}
10821105
}

src/main/java/com/rabbitmq/client/impl/AMQChannel.java

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,11 @@
1919
import java.io.IOException;
2020
import java.util.concurrent.TimeoutException;
2121

22-
import com.rabbitmq.client.AlreadyClosedException;
23-
import com.rabbitmq.client.Command;
22+
import com.rabbitmq.client.*;
2423
import com.rabbitmq.client.Method;
25-
import com.rabbitmq.client.Connection;
26-
import com.rabbitmq.client.ShutdownSignalException;
2724
import com.rabbitmq.utility.BlockingValueOrException;
25+
import org.slf4j.Logger;
26+
import org.slf4j.LoggerFactory;
2827

2928
/**
3029
* Base class modelling an AMQ channel. Subclasses implement
@@ -37,6 +36,11 @@
3736
* @see Connection
3837
*/
3938
public abstract class AMQChannel extends ShutdownNotifierComponent {
39+
40+
private static final Logger LOGGER = LoggerFactory.getLogger(AMQChannel.class);
41+
42+
private static final int NO_RPC_TIMEOUT = ConnectionFactory.DEFAULT_CHANNEL_RPC_TIMEOUT;
43+
4044
/**
4145
* Protected; used instead of synchronizing on the channel itself,
4246
* so that clients can themselves use the channel to synchronize
@@ -59,6 +63,9 @@ public abstract class AMQChannel extends ShutdownNotifierComponent {
5963
/** Whether transmission of content-bearing methods should be blocked */
6064
public volatile boolean _blockContent = false;
6165

66+
/** Timeout for RPC calls */
67+
private final int _rpcTimeout;
68+
6269
/**
6370
* Construct a channel on the given connection, with the given channel number.
6471
* @param connection the underlying connection for this channel
@@ -67,6 +74,7 @@ public abstract class AMQChannel extends ShutdownNotifierComponent {
6774
public AMQChannel(AMQConnection connection, int channelNumber) {
6875
this._connection = connection;
6976
this._channelNumber = channelNumber;
77+
this._rpcTimeout = connection.getChannelRpcTimeout();
7078
}
7179

7280
/**
@@ -225,8 +233,23 @@ private AMQCommand privateRpc(Method m)
225233
//
226234
// Calling getReply() on the continuation puts us to sleep
227235
// until the connection's reader-thread throws the reply over
228-
// the fence.
229-
return k.getReply();
236+
// the fence or the RPC times out (if enabled)
237+
if(_rpcTimeout == NO_RPC_TIMEOUT) {
238+
return k.getReply();
239+
} else {
240+
try {
241+
return k.getReply(_rpcTimeout);
242+
} catch (TimeoutException e) {
243+
try {
244+
// clean RPC channel state
245+
nextOutstandingRpc();
246+
markRpcFinished();
247+
} catch(Exception ex) {
248+
LOGGER.warn("Error while cleaning timed out channel RPC: {}", ex.getMessage());
249+
}
250+
throw new ChannelRpcTimeoutException(e, this, m);
251+
}
252+
}
230253
}
231254

232255
private AMQCommand privateRpc(Method m, int timeout)

src/main/java/com/rabbitmq/client/impl/AMQConnection.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -94,11 +94,7 @@ public static Map<String, Object> defaultClientProperties() {
9494
new Version(AMQP.PROTOCOL.MAJOR, AMQP.PROTOCOL.MINOR);
9595

9696
/** The special channel 0 (<i>not</i> managed by the <code><b>_channelManager</b></code>) */
97-
private final AMQChannel _channel0 = new AMQChannel(this, 0) {
98-
@Override public boolean processAsync(Command c) throws IOException {
99-
return getConnection().processControlCommand(c);
100-
}
101-
};
97+
private final AMQChannel _channel0;
10298

10399
protected ConsumerWorkService _workService = null;
104100

@@ -137,6 +133,7 @@ public static Map<String, Object> defaultClientProperties() {
137133
private final String password;
138134
private final Collection<BlockedListener> blockedListeners = new CopyOnWriteArrayList<BlockedListener>();
139135
protected final MetricsCollector metricsCollector;
136+
private final int channelRpcTimeout;
140137

141138
/* State modified after start - all volatile */
142139

@@ -228,6 +225,13 @@ public AMQConnection(ConnectionParams params, FrameHandler frameHandler, Metrics
228225
this.heartbeatExecutor = params.getHeartbeatExecutor();
229226
this.shutdownExecutor = params.getShutdownExecutor();
230227
this.threadFactory = params.getThreadFactory();
228+
this.channelRpcTimeout = params.getChannelRpcTimeout();
229+
230+
this._channel0 = new AMQChannel(this, 0) {
231+
@Override public boolean processAsync(Command c) throws IOException {
232+
return getConnection().processControlCommand(c);
233+
}
234+
};
231235

232236
this._channelManager = null;
233237

@@ -1045,4 +1049,8 @@ public String getId() {
10451049
public void setId(String id) {
10461050
this.id = id;
10471051
}
1052+
1053+
public int getChannelRpcTimeout() {
1054+
return channelRpcTimeout;
1055+
}
10481056
}

src/main/java/com/rabbitmq/client/impl/ConnectionParams.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ public class ConnectionParams {
3939
private SaslConfig saslConfig;
4040
private long networkRecoveryInterval;
4141
private boolean topologyRecovery;
42+
private int channelRpcTimeout;
4243

4344
private ExceptionHandler exceptionHandler;
4445
private ThreadFactory threadFactory;
@@ -109,6 +110,10 @@ public ThreadFactory getThreadFactory() {
109110
return threadFactory;
110111
}
111112

113+
public int getChannelRpcTimeout() {
114+
return channelRpcTimeout;
115+
}
116+
112117
public void setUsername(String username) {
113118
this.username = username;
114119
}
@@ -180,4 +185,8 @@ public ScheduledExecutorService getHeartbeatExecutor() {
180185
public void setHeartbeatExecutor(ScheduledExecutorService heartbeatExecutor) {
181186
this.heartbeatExecutor = heartbeatExecutor;
182187
}
188+
189+
public void setChannelRpcTimeout(int channelRpcTimeout) {
190+
this.channelRpcTimeout = channelRpcTimeout;
191+
}
183192
}
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
// Copyright (c) 2007-Present Pivotal Software, Inc. All rights reserved.
2+
//
3+
// This software, the RabbitMQ Java client library, is triple-licensed under the
4+
// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
5+
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
6+
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
7+
// please see LICENSE-APACHE2.
8+
//
9+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
10+
// either express or implied. See the LICENSE file for specific language governing
11+
// rights and limitations of this software.
12+
//
13+
// If you have any questions regarding licensing, please contact us at
14+
// info@rabbitmq.com.
15+
16+
package com.rabbitmq.client.test;
17+
18+
import com.rabbitmq.client.ChannelRpcTimeoutException;
19+
import com.rabbitmq.client.Command;
20+
import com.rabbitmq.client.Method;
21+
import com.rabbitmq.client.impl.AMQChannel;
22+
import com.rabbitmq.client.impl.AMQCommand;
23+
import com.rabbitmq.client.impl.AMQConnection;
24+
import com.rabbitmq.client.impl.AMQImpl;
25+
import org.junit.After;
26+
import org.junit.Before;
27+
import org.junit.Test;
28+
29+
import java.io.IOException;
30+
import java.util.concurrent.Callable;
31+
import java.util.concurrent.Executors;
32+
import java.util.concurrent.ScheduledExecutorService;
33+
import java.util.concurrent.TimeUnit;
34+
35+
import static org.hamcrest.Matchers.is;
36+
import static org.junit.Assert.*;
37+
import static org.mockito.Mockito.mock;
38+
import static org.mockito.Mockito.when;
39+
40+
public class AMQChannelTest {
41+
42+
ScheduledExecutorService scheduler;
43+
44+
@Before public void init() {
45+
scheduler = Executors.newSingleThreadScheduledExecutor();
46+
}
47+
48+
@After public void tearDown() {
49+
scheduler.shutdownNow();
50+
}
51+
52+
@Test public void rpcTimesOutWhenResponseDoesNotCome() throws IOException {
53+
int rpcTimeout = 100;
54+
AMQConnection connection = mock(AMQConnection.class);
55+
when(connection.getChannelRpcTimeout()).thenReturn(rpcTimeout);
56+
57+
DummyAmqChannel channel = new DummyAmqChannel(connection, 1);
58+
Method method = new AMQImpl.Queue.Declare.Builder()
59+
.queue("")
60+
.durable(false)
61+
.exclusive(true)
62+
.autoDelete(true)
63+
.arguments(null)
64+
.build();
65+
66+
try {
67+
channel.rpc(method);
68+
fail("Should time out and throw an exception");
69+
} catch(ChannelRpcTimeoutException e) {
70+
// OK
71+
assertThat((DummyAmqChannel) e.getChannel(), is(channel));
72+
assertThat(e.getMethod(), is(method));
73+
assertNull("outstanding RPC should have been cleaned", channel.nextOutstandingRpc());
74+
}
75+
}
76+
77+
@Test public void rpcReturnsResultWhenResponseHasCome() throws IOException {
78+
int rpcTimeout = 1000;
79+
AMQConnection connection = mock(AMQConnection.class);
80+
when(connection.getChannelRpcTimeout()).thenReturn(rpcTimeout);
81+
82+
final DummyAmqChannel channel = new DummyAmqChannel(connection, 1);
83+
Method method = new AMQImpl.Queue.Declare.Builder()
84+
.queue("")
85+
.durable(false)
86+
.exclusive(true)
87+
.autoDelete(true)
88+
.arguments(null)
89+
.build();
90+
91+
final Method response = new AMQImpl.Queue.DeclareOk.Builder()
92+
.queue("whatever")
93+
.consumerCount(0)
94+
.messageCount(0).build();
95+
96+
scheduler.schedule(new Callable<Void>() {
97+
@Override
98+
public Void call() throws Exception {
99+
channel.handleCompleteInboundCommand(new AMQCommand(response));
100+
return null;
101+
}
102+
}, (long) (rpcTimeout / 2.0), TimeUnit.MILLISECONDS);
103+
104+
AMQCommand rpcResponse = channel.rpc(method);
105+
assertThat(rpcResponse.getMethod(), is(response));
106+
}
107+
108+
static class DummyAmqChannel extends AMQChannel {
109+
110+
public DummyAmqChannel(AMQConnection connection, int channelNumber) {
111+
super(connection, channelNumber);
112+
}
113+
114+
@Override
115+
public boolean processAsync(Command command) throws IOException {
116+
return false;
117+
}
118+
}
119+
120+
}

0 commit comments

Comments
 (0)