Skip to content

Commit 4d8b097

Browse files
committed
Set default continuation timeout to 10 minutes
Fixes #219
1 parent 03db046 commit 4d8b097

File tree

7 files changed

+36
-15
lines changed

7 files changed

+36
-15
lines changed

src/main/java/com/rabbitmq/client/ChannelRpcTimeoutException.java renamed to src/main/java/com/rabbitmq/client/ChannelContinuationTimeoutException.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,13 @@
44
import java.util.concurrent.TimeoutException;
55

66
/**
7-
* Exception thrown when a channel times out on a RPC call.
7+
* Exception thrown when a channel times out on a continuation during a RPC call.
88
* @since 4.1.0
99
*/
10-
public class ChannelRpcTimeoutException extends IOException {
10+
public class ChannelContinuationTimeoutException extends IOException {
1111

1212
/**
13-
* The channel that performed the RPC.
13+
* The channel that performed the call.
1414
*/
1515
private final Object channel;
1616

@@ -19,7 +19,7 @@ public class ChannelRpcTimeoutException extends IOException {
1919
*/
2020
private final Method method;
2121

22-
public ChannelRpcTimeoutException(TimeoutException cause, Object channel, Method method) {
22+
public ChannelContinuationTimeoutException(TimeoutException cause, Object channel, Method method) {
2323
super(cause);
2424
this.channel = channel;
2525
this.method = method;

src/main/java/com/rabbitmq/client/ConnectionFactory.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333
import java.util.*;
3434
import java.util.concurrent.*;
3535

36+
import static java.util.concurrent.TimeUnit.*;
37+
3638
/**
3739
* Convenience "factory" class to facilitate opening a {@link Connection} to an AMQP broker.
3840
*/
@@ -73,8 +75,8 @@ public class ConnectionFactory implements Cloneable {
7375
* zero means wait indefinitely */
7476
public static final int DEFAULT_SHUTDOWN_TIMEOUT = 10000;
7577

76-
/** The default timeout for RPC calls in channels: no timeout */
77-
public static final int DEFAULT_CHANNEL_RPC_TIMEOUT = -1;
78+
/** The default continuation timeout for RPC calls in channels: 10 minutes */
79+
public static final int DEFAULT_CHANNEL_RPC_TIMEOUT = (int) MINUTES.toMillis(10);
7880

7981
private static final String PREFERRED_TLS_PROTOCOL = "TLSv1.2";
8082

@@ -120,7 +122,7 @@ public class ConnectionFactory implements Cloneable {
120122
private SSLContext sslContext;
121123

122124
/**
123-
* RPC timeout.
125+
* Continuation timeout on RPC calls.
124126
* @since 4.1.0
125127
*/
126128
private int channelRpcTimeout = DEFAULT_CHANNEL_RPC_TIMEOUT;
@@ -1091,11 +1093,14 @@ public void useBlockingIo() {
10911093
}
10921094

10931095
/**
1094-
* Set the timeout for RPC calls in channels.
1095-
* Default is no timeout.
1096+
* Set the continuation timeout for RPC calls in channels.
1097+
* Default is 10 minutes. 0 means no timeout.
10961098
* @param channelRpcTimeout
10971099
*/
10981100
public void setChannelRpcTimeout(int channelRpcTimeout) {
1101+
if(channelRpcTimeout < 0) {
1102+
throw new IllegalArgumentException("Timeout cannot be less than 0");
1103+
}
10991104
this.channelRpcTimeout = channelRpcTimeout;
11001105
}
11011106

src/main/java/com/rabbitmq/client/impl/AMQChannel.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ public abstract class AMQChannel extends ShutdownNotifierComponent {
3939

4040
private static final Logger LOGGER = LoggerFactory.getLogger(AMQChannel.class);
4141

42-
private static final int NO_RPC_TIMEOUT = ConnectionFactory.DEFAULT_CHANNEL_RPC_TIMEOUT;
42+
private static final int NO_RPC_TIMEOUT = 0;
4343

4444
/**
4545
* Protected; used instead of synchronizing on the channel itself,
@@ -74,6 +74,9 @@ public abstract class AMQChannel extends ShutdownNotifierComponent {
7474
public AMQChannel(AMQConnection connection, int channelNumber) {
7575
this._connection = connection;
7676
this._channelNumber = channelNumber;
77+
if(connection.getChannelRpcTimeout() < 0) {
78+
throw new IllegalArgumentException("Continuation timeout on RPC calls cannot be less than 0");
79+
}
7780
this._rpcTimeout = connection.getChannelRpcTimeout();
7881
}
7982

@@ -235,6 +238,7 @@ private AMQCommand privateRpc(Method m)
235238
// until the connection's reader-thread throws the reply over
236239
// the fence or the RPC times out (if enabled)
237240
if(_rpcTimeout == NO_RPC_TIMEOUT) {
241+
System.out.println("passe");
238242
return k.getReply();
239243
} else {
240244
try {
@@ -247,7 +251,7 @@ private AMQCommand privateRpc(Method m)
247251
} catch(Exception ex) {
248252
LOGGER.warn("Error while cleaning timed out channel RPC: {}", ex.getMessage());
249253
}
250-
throw new ChannelRpcTimeoutException(e, this, m);
254+
throw new ChannelContinuationTimeoutException(e, this, m);
251255
}
252256
}
253257
}

src/main/java/com/rabbitmq/client/impl/AMQConnection.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,9 @@ public AMQConnection(ConnectionParams params, FrameHandler frameHandler, Metrics
225225
this.heartbeatExecutor = params.getHeartbeatExecutor();
226226
this.shutdownExecutor = params.getShutdownExecutor();
227227
this.threadFactory = params.getThreadFactory();
228+
if(params.getChannelRpcTimeout() < 0) {
229+
throw new IllegalArgumentException("Continuation timeout on RPC calls cannot be less than 0");
230+
}
228231
this.channelRpcTimeout = params.getChannelRpcTimeout();
229232

230233
this._channel0 = new AMQChannel(this, 0) {

src/test/java/com/rabbitmq/client/test/AMQChannelTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515

1616
package com.rabbitmq.client.test;
1717

18-
import com.rabbitmq.client.ChannelRpcTimeoutException;
18+
import com.rabbitmq.client.ChannelContinuationTimeoutException;
1919
import com.rabbitmq.client.Command;
2020
import com.rabbitmq.client.Method;
2121
import com.rabbitmq.client.impl.AMQChannel;
@@ -66,7 +66,7 @@ public class AMQChannelTest {
6666
try {
6767
channel.rpc(method);
6868
fail("Should time out and throw an exception");
69-
} catch(ChannelRpcTimeoutException e) {
69+
} catch(ChannelContinuationTimeoutException e) {
7070
// OK
7171
assertThat((DummyAmqChannel) e.getChannel(), is(channel));
7272
assertThat(e.getMethod(), is(method));

src/test/java/com/rabbitmq/client/test/AMQConnectionTest.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,16 @@ public class AMQConnectionTest {
9999
cf.setHandshakeTimeout(7000);
100100
}
101101

102+
@Test public void negativeRpcTimeoutIsForbidden() {
103+
ConnectionFactory cf = TestUtils.connectionFactory();
104+
try {
105+
cf.setChannelRpcTimeout(-10);
106+
fail("expected an exception");
107+
} catch (IllegalArgumentException _ignored) {
108+
// expected
109+
}
110+
}
111+
102112
/** Check the AMQConnection does send exactly 1 initial header, and deal correctly with
103113
* the frame handler throwing an exception when we try to read data
104114
*/

src/test/java/com/rabbitmq/client/test/ChannelRpcTimeoutIntegrationTest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,10 +71,9 @@ public void tearDown() throws Exception {
7171
connection.start();
7272
Channel channel = connection.createChannel();
7373
try {
74-
7574
channel.queueDeclare();
7675
fail("Should time out and throw an exception");
77-
} catch(ChannelRpcTimeoutException e) {
76+
} catch(ChannelContinuationTimeoutException e) {
7877
// OK
7978
assertThat((Channel) e.getChannel(), is(channel));
8079
assertThat(e.getMethod(), instanceOf(AMQP.Queue.Declare.class));

0 commit comments

Comments
 (0)