Skip to content

Commit 3242827

Browse files
author
Hubert Plociniczak
committed
Allow for NullPointerException if channel does not exist.
Remove TimeoutBlockingRpcContinuation and add getReply with timeout to BlockingRpcContinuation instead. Clarified uninterruptibleGet(int) in BlockingCell.
1 parent 0db2e74 commit 3242827

File tree

3 files changed

+20
-22
lines changed

3 files changed

+20
-22
lines changed

src/com/rabbitmq/client/impl/AMQChannel.java

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,7 @@ public AMQCommand quiescingRpc(Method m,
212212
final AMQCommand timeoutReply)
213213
throws IOException, ShutdownSignalException, TimeoutException
214214
{
215-
TimeoutBlockingRpcContinuation k = new TimeoutBlockingRpcContinuation();
215+
SimpleBlockingRpcContinuation k = new SimpleBlockingRpcContinuation();
216216
transmitAndEnqueue(m, k);
217217
return k.getReply(timeoutMillisec);
218218
}
@@ -274,6 +274,12 @@ public T getReply() throws ShutdownSignalException
274274
{
275275
return _blocker.uninterruptibleGetValue();
276276
}
277+
278+
public T getReply(int timeout)
279+
throws ShutdownSignalException, TimeoutException
280+
{
281+
return _blocker.uninterruptibleGetValue(timeout);
282+
}
277283

278284
public abstract T transformReply(AMQCommand command);
279285
}
@@ -285,14 +291,4 @@ public AMQCommand transformReply(AMQCommand command) {
285291
return command;
286292
}
287293
}
288-
289-
public static class TimeoutBlockingRpcContinuation
290-
extends SimpleBlockingRpcContinuation
291-
{
292-
public AMQCommand getReply(int timeout)
293-
throws ShutdownSignalException, TimeoutException
294-
{
295-
return _blocker.uninterruptibleGetValue(timeout);
296-
}
297-
}
298294
}

src/com/rabbitmq/client/impl/AMQConnection.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -440,8 +440,8 @@ public MainLoop() {
440440
// channel zero that aren't Connection.CloseOk) must
441441
// be discarded.
442442
ChannelN channel = _channelManager.getChannel(frame.channel);
443-
if (channel != null)
444-
channel.handleFrame(frame);
443+
// FIXME: catch NullPointerException and throw more informative one?
444+
channel.handleFrame(frame);
445445
}
446446
}
447447
}

src/com/rabbitmq/utility/BlockingCell.java

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -95,30 +95,32 @@ public synchronized T uninterruptibleGet() {
9595

9696
/**
9797
* As get(long timeout), but catches and ignores InterruptedException, retrying until
98-
* a value appears or until timeout original is reached. If timeout is reached,
98+
* a value appears or until specified timeout is reached. If timeout is reached,
9999
* TimeoutException it thrown.
100100
* We also use System.nanoTime() to behave correctly when system clock jumps around.
101101
*
102-
* @param timeout timeout in miliseconds. Value less than zero effectively means infinity
102+
* @param timeout timeout in miliseconds. 0 effectively means infinity
103103
* @return the waited-for value
104104
*/
105105
public synchronized T uninterruptibleGet(int timeout) throws TimeoutException {
106-
long runTime = System.nanoTime() / NANOS_IN_MILLI + timeout;
107-
long now = 0;
106+
long now = System.nanoTime() / NANOS_IN_MILLI;
107+
long runTime = now + timeout;
108+
108109

109110
if (timeout < 0) {
110111
throw new AssertionError("Timeout cannot be less than zero");
111112
}
112-
113-
while ((timeout == 0) || ((now = System.nanoTime() / NANOS_IN_MILLI) < runTime)) {
114-
try {
113+
114+
do {
115+
try {
115116
synchronized(this) {
116-
return get(timeout != 0 ? (runTime - now) : 0);
117+
return get(runTime - now);
117118
}
118119
} catch (InterruptedException e) {
119120
// Ignore.
120121
}
121-
}
122+
} while ((timeout == 0) || ((now = System.nanoTime() / NANOS_IN_MILLI) < runTime));
123+
122124
throw new TimeoutException();
123125
}
124126

0 commit comments

Comments
 (0)