Skip to content

Commit a8930cc

Browse files
Merge pull request #220 from rabbitmq/rabbitmq-java-client-219
Add configurable timeout on RPC calls
2 parents adc3899 + 6b5d676 commit a8930cc

File tree

9 files changed

+423
-11
lines changed

9 files changed

+423
-11
lines changed
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package com.rabbitmq.client;
2+
3+
import java.io.IOException;
4+
import java.util.concurrent.TimeoutException;
5+
6+
/**
7+
* Exception thrown when a channel times out on a continuation during a RPC call.
8+
* @since 4.1.0
9+
*/
10+
public class ChannelContinuationTimeoutException extends IOException {
11+
12+
/**
13+
* The channel that performed the call.
14+
* Typed as <code>Object</code> as the underlying
15+
* object that performs the call might
16+
* not be an implementation of {@link Channel}.
17+
*/
18+
private final Object channel;
19+
20+
/**
21+
* The number of the channel that performed the call.
22+
*/
23+
private final int channelNumber;
24+
25+
/**
26+
* The request method that timed out.
27+
*/
28+
private final Method method;
29+
30+
public ChannelContinuationTimeoutException(TimeoutException cause, Object channel, int channelNumber, Method method) {
31+
super(
32+
"Continuation call for method " + method + " on channel " + channel + " (#" + channelNumber + ") timed out",
33+
cause
34+
);
35+
this.channel = channel;
36+
this.channelNumber = channelNumber;
37+
this.method = method;
38+
}
39+
40+
/**
41+
*
42+
* @return request method that timed out
43+
*/
44+
public Method getMethod() {
45+
return method;
46+
}
47+
48+
/**
49+
* channel that performed the call
50+
* @return
51+
*/
52+
public Object getChannel() {
53+
return channel;
54+
}
55+
56+
/**
57+
*
58+
* @return number of the channel that performed the call
59+
*/
60+
public int getChannelNumber() {
61+
return channelNumber;
62+
}
63+
}

src/main/java/com/rabbitmq/client/ConnectionFactory.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333
import java.util.*;
3434
import java.util.concurrent.*;
3535

36+
import static java.util.concurrent.TimeUnit.*;
37+
3638
/**
3739
* Convenience "factory" class to facilitate opening a {@link Connection} to an AMQP broker.
3840
*/
@@ -73,6 +75,9 @@ public class ConnectionFactory implements Cloneable {
7375
* zero means wait indefinitely */
7476
public static final int DEFAULT_SHUTDOWN_TIMEOUT = 10000;
7577

78+
/** The default continuation timeout for RPC calls in channels: 10 minutes */
79+
public static final int DEFAULT_CHANNEL_RPC_TIMEOUT = (int) MINUTES.toMillis(10);
80+
7681
private static final String PREFERRED_TLS_PROTOCOL = "TLSv1.2";
7782

7883
private static final String FALLBACK_TLS_PROTOCOL = "TLSv1";
@@ -116,6 +121,12 @@ public class ConnectionFactory implements Cloneable {
116121

117122
private SSLContext sslContext;
118123

124+
/**
125+
* Continuation timeout on RPC calls.
126+
* @since 4.1.0
127+
*/
128+
private int channelRpcTimeout = DEFAULT_CHANNEL_RPC_TIMEOUT;
129+
119130
/** @return the default host to use for connections */
120131
public String getHost() {
121132
return host;
@@ -937,6 +948,7 @@ public ConnectionParams params(ExecutorService consumerWorkServiceExecutor) {
937948
result.setRequestedHeartbeat(requestedHeartbeat);
938949
result.setShutdownExecutor(shutdownExecutor);
939950
result.setHeartbeatExecutor(heartbeatExecutor);
951+
result.setChannelRpcTimeout(channelRpcTimeout);
940952
return result;
941953
}
942954

@@ -1079,4 +1091,24 @@ public void useNio() {
10791091
public void useBlockingIo() {
10801092
this.nio = false;
10811093
}
1094+
1095+
/**
1096+
* Set the continuation timeout for RPC calls in channels.
1097+
* Default is 10 minutes. 0 means no timeout.
1098+
* @param channelRpcTimeout
1099+
*/
1100+
public void setChannelRpcTimeout(int channelRpcTimeout) {
1101+
if(channelRpcTimeout < 0) {
1102+
throw new IllegalArgumentException("Timeout cannot be less than 0");
1103+
}
1104+
this.channelRpcTimeout = channelRpcTimeout;
1105+
}
1106+
1107+
/**
1108+
* Get the timeout for RPC calls in channels.
1109+
* @return
1110+
*/
1111+
public int getChannelRpcTimeout() {
1112+
return channelRpcTimeout;
1113+
}
10821114
}

src/main/java/com/rabbitmq/client/impl/AMQChannel.java

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,11 @@
1919
import java.io.IOException;
2020
import java.util.concurrent.TimeoutException;
2121

22-
import com.rabbitmq.client.AlreadyClosedException;
23-
import com.rabbitmq.client.Command;
22+
import com.rabbitmq.client.*;
2423
import com.rabbitmq.client.Method;
25-
import com.rabbitmq.client.Connection;
26-
import com.rabbitmq.client.ShutdownSignalException;
2724
import com.rabbitmq.utility.BlockingValueOrException;
25+
import org.slf4j.Logger;
26+
import org.slf4j.LoggerFactory;
2827

2928
/**
3029
* Base class modelling an AMQ channel. Subclasses implement
@@ -37,6 +36,11 @@
3736
* @see Connection
3837
*/
3938
public abstract class AMQChannel extends ShutdownNotifierComponent {
39+
40+
private static final Logger LOGGER = LoggerFactory.getLogger(AMQChannel.class);
41+
42+
private static final int NO_RPC_TIMEOUT = 0;
43+
4044
/**
4145
* Protected; used instead of synchronizing on the channel itself,
4246
* so that clients can themselves use the channel to synchronize
@@ -59,6 +63,9 @@ public abstract class AMQChannel extends ShutdownNotifierComponent {
5963
/** Whether transmission of content-bearing methods should be blocked */
6064
public volatile boolean _blockContent = false;
6165

66+
/** Timeout for RPC calls */
67+
private final int _rpcTimeout;
68+
6269
/**
6370
* Construct a channel on the given connection, with the given channel number.
6471
* @param connection the underlying connection for this channel
@@ -67,6 +74,10 @@ public abstract class AMQChannel extends ShutdownNotifierComponent {
6774
public AMQChannel(AMQConnection connection, int channelNumber) {
6875
this._connection = connection;
6976
this._channelNumber = channelNumber;
77+
if(connection.getChannelRpcTimeout() < 0) {
78+
throw new IllegalArgumentException("Continuation timeout on RPC calls cannot be less than 0");
79+
}
80+
this._rpcTimeout = connection.getChannelRpcTimeout();
7081
}
7182

7283
/**
@@ -225,8 +236,23 @@ private AMQCommand privateRpc(Method m)
225236
//
226237
// Calling getReply() on the continuation puts us to sleep
227238
// until the connection's reader-thread throws the reply over
228-
// the fence.
229-
return k.getReply();
239+
// the fence or the RPC times out (if enabled)
240+
if(_rpcTimeout == NO_RPC_TIMEOUT) {
241+
return k.getReply();
242+
} else {
243+
try {
244+
return k.getReply(_rpcTimeout);
245+
} catch (TimeoutException e) {
246+
try {
247+
// clean RPC channel state
248+
nextOutstandingRpc();
249+
markRpcFinished();
250+
} catch(Exception ex) {
251+
LOGGER.warn("Error while cleaning timed out channel RPC: {}", ex.getMessage());
252+
}
253+
throw new ChannelContinuationTimeoutException(e, this, this._channelNumber, m);
254+
}
255+
}
230256
}
231257

232258
private AMQCommand privateRpc(Method m, int timeout)

src/main/java/com/rabbitmq/client/impl/AMQConnection.java

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -94,11 +94,7 @@ public static Map<String, Object> defaultClientProperties() {
9494
new Version(AMQP.PROTOCOL.MAJOR, AMQP.PROTOCOL.MINOR);
9595

9696
/** The special channel 0 (<i>not</i> managed by the <code><b>_channelManager</b></code>) */
97-
private final AMQChannel _channel0 = new AMQChannel(this, 0) {
98-
@Override public boolean processAsync(Command c) throws IOException {
99-
return getConnection().processControlCommand(c);
100-
}
101-
};
97+
private final AMQChannel _channel0;
10298

10399
protected ConsumerWorkService _workService = null;
104100

@@ -137,6 +133,7 @@ public static Map<String, Object> defaultClientProperties() {
137133
private final String password;
138134
private final Collection<BlockedListener> blockedListeners = new CopyOnWriteArrayList<BlockedListener>();
139135
protected final MetricsCollector metricsCollector;
136+
private final int channelRpcTimeout;
140137

141138
/* State modified after start - all volatile */
142139

@@ -228,6 +225,16 @@ public AMQConnection(ConnectionParams params, FrameHandler frameHandler, Metrics
228225
this.heartbeatExecutor = params.getHeartbeatExecutor();
229226
this.shutdownExecutor = params.getShutdownExecutor();
230227
this.threadFactory = params.getThreadFactory();
228+
if(params.getChannelRpcTimeout() < 0) {
229+
throw new IllegalArgumentException("Continuation timeout on RPC calls cannot be less than 0");
230+
}
231+
this.channelRpcTimeout = params.getChannelRpcTimeout();
232+
233+
this._channel0 = new AMQChannel(this, 0) {
234+
@Override public boolean processAsync(Command c) throws IOException {
235+
return getConnection().processControlCommand(c);
236+
}
237+
};
231238

232239
this._channelManager = null;
233240

@@ -1045,4 +1052,8 @@ public String getId() {
10451052
public void setId(String id) {
10461053
this.id = id;
10471054
}
1055+
1056+
public int getChannelRpcTimeout() {
1057+
return channelRpcTimeout;
1058+
}
10481059
}

src/main/java/com/rabbitmq/client/impl/ConnectionParams.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ public class ConnectionParams {
3939
private SaslConfig saslConfig;
4040
private long networkRecoveryInterval;
4141
private boolean topologyRecovery;
42+
private int channelRpcTimeout;
4243

4344
private ExceptionHandler exceptionHandler;
4445
private ThreadFactory threadFactory;
@@ -109,6 +110,10 @@ public ThreadFactory getThreadFactory() {
109110
return threadFactory;
110111
}
111112

113+
public int getChannelRpcTimeout() {
114+
return channelRpcTimeout;
115+
}
116+
112117
public void setUsername(String username) {
113118
this.username = username;
114119
}
@@ -180,4 +185,8 @@ public ScheduledExecutorService getHeartbeatExecutor() {
180185
public void setHeartbeatExecutor(ScheduledExecutorService heartbeatExecutor) {
181186
this.heartbeatExecutor = heartbeatExecutor;
182187
}
188+
189+
public void setChannelRpcTimeout(int channelRpcTimeout) {
190+
this.channelRpcTimeout = channelRpcTimeout;
191+
}
183192
}

0 commit comments

Comments
 (0)