Skip to content

Commit fc6ab7b

Browse files
author
Emile Joubert
committed
Merged bug24648 into default
2 parents acd36ab + ec6b38d commit fc6ab7b

File tree

6 files changed

+78
-55
lines changed

6 files changed

+78
-55
lines changed

build.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -491,3 +491,4 @@
491491
</bundlor:bundlor>
492492
</target>
493493
</project>
494+

src/com/rabbitmq/client/Channel.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import java.io.IOException;
2020
import java.util.Map;
21+
import java.util.concurrent.TimeoutException;
2122

2223
import com.rabbitmq.client.AMQP.BasicProperties;
2324
import com.rabbitmq.client.AMQP.Exchange;
@@ -726,13 +727,31 @@ void basicNack(long deliveryTag, boolean multiple, boolean requeue)
726727
*/
727728
boolean waitForConfirms() throws InterruptedException;
728729

730+
/**
731+
* Wait until all messages published since the last call have been
732+
* either ack'd or nack'd by the broker; or until timeout elapses.
733+
* If the timeout expires a TimeoutException is thrown. When
734+
* called on a non-Confirm channel, waitForConfirms returns true
735+
* immediately.
736+
* @return whether all the messages were ack'd (and none were nack'd)
737+
*/
738+
boolean waitForConfirms(long timeout) throws InterruptedException, TimeoutException;
739+
729740
/** Wait until all messages published since the last call have
730741
* been either ack'd or nack'd by the broker. If any of the
731742
* messages were nack'd, waitForConfirmsOrDie will throw an
732743
* IOException. When called on a non-Confirm channel, it will
733744
* return immediately. */
734745
void waitForConfirmsOrDie() throws IOException, InterruptedException;
735746

747+
/** Wait until all messages published since the last call have
748+
* been either ack'd or nack'd by the broker; or until timeout elapses.
749+
* If the timeout expires a TimeoutException is thrown. If any of the
750+
* messages were nack'd, waitForConfirmsOrDie will throw an
751+
* IOException. When called on a non-Confirm channel, it will
752+
* return immediately. */
753+
void waitForConfirmsOrDie(long timeout) throws IOException, InterruptedException, TimeoutException;
754+
736755
/**
737756
* Asynchronously send a method over this channel.
738757
* @param method method to transmit over this channel.

src/com/rabbitmq/client/impl/AMQConnection.java

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -578,9 +578,7 @@ public boolean processControlCommand(Command c) throws IOException
578578
// Already shutting down, so just send back a CloseOk.
579579
try {
580580
_channel0.quiescingTransmit(new AMQP.Connection.CloseOk.Builder().build());
581-
} catch (IOException ioe) {
582-
Utility.emptyStatement();
583-
}
581+
} catch (IOException _) { } // ignore
584582
return true;
585583
} else if (method instanceof AMQP.Connection.CloseOk) {
586584
// It's our final "RPC". Time to shut down.
@@ -599,9 +597,7 @@ public void handleConnectionClose(Command closeCommand) {
599597
ShutdownSignalException sse = shutdown(closeCommand, false, null, false);
600598
try {
601599
_channel0.quiescingTransmit(new AMQP.Connection.CloseOk.Builder().build());
602-
} catch (IOException ioe) {
603-
Utility.emptyStatement();
604-
}
600+
} catch (IOException _) { } // ignore
605601
_brokerInitiatedShutdown = true;
606602
Thread scw = new SocketCloseWait(sse);
607603
scw.setName("AMQP Connection Closing Monitor " +
@@ -708,9 +704,7 @@ public void abort(int closeCode, String closeMessage, int timeout)
708704
{
709705
try {
710706
close(closeCode, closeMessage, true, null, timeout, true);
711-
} catch (IOException e) {
712-
Utility.emptyStatement();
713-
}
707+
} catch (IOException _) { } // ignore
714708
}
715709

716710
/**

src/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 42 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -121,10 +121,8 @@ public ChannelN(AMQConnection connection, int channelNumber,
121121
* @throws IOException if any problem is encountered
122122
*/
123123
public void open() throws IOException {
124-
// wait for the Channel.OpenOk response, then ignore it
125-
Channel.OpenOk openOk =
126-
(Channel.OpenOk) exnWrappingRpc(new Channel.Open(UNSPECIFIED_OUT_OF_BAND)).getMethod();
127-
Utility.use(openOk);
124+
// wait for the Channel.OpenOk response, and ignore it
125+
exnWrappingRpc(new Channel.Open(UNSPECIFIED_OUT_OF_BAND));
128126
}
129127

130128
public void addReturnListener(ReturnListener listener) {
@@ -167,6 +165,17 @@ public void clearConfirmListeners() {
167165
public boolean waitForConfirms()
168166
throws InterruptedException
169167
{
168+
boolean confirms = false;
169+
try {
170+
confirms = waitForConfirms(0L);
171+
} catch (TimeoutException e) { }
172+
return confirms;
173+
}
174+
175+
/** {@inheritDoc} */
176+
public boolean waitForConfirms(long timeout)
177+
throws InterruptedException, TimeoutException {
178+
long startTime = System.currentTimeMillis();
170179
synchronized (unconfirmedSet) {
171180
while (true) {
172181
if (getCloseReason() != null) {
@@ -177,7 +186,16 @@ public boolean waitForConfirms()
177186
onlyAcksReceived = true;
178187
return aux;
179188
}
180-
unconfirmedSet.wait();
189+
if (timeout == 0L) {
190+
unconfirmedSet.wait();
191+
} else {
192+
long elapsed = System.currentTimeMillis() - startTime;
193+
if (timeout > elapsed) {
194+
unconfirmedSet.wait(timeout - elapsed);
195+
} else {
196+
throw new TimeoutException();
197+
}
198+
}
181199
}
182200
}
183201
}
@@ -186,9 +204,23 @@ public boolean waitForConfirms()
186204
public void waitForConfirmsOrDie()
187205
throws IOException, InterruptedException
188206
{
189-
if (!waitForConfirms()) {
190-
close(AMQP.REPLY_SUCCESS, "NACKS RECEIVED", true, null, false);
191-
throw new IOException("nacks received");
207+
try {
208+
waitForConfirmsOrDie(0L);
209+
} catch (TimeoutException e) { }
210+
}
211+
212+
/** {@inheritDoc} */
213+
public void waitForConfirmsOrDie(long timeout)
214+
throws IOException, InterruptedException, TimeoutException
215+
{
216+
try {
217+
if (!waitForConfirms(timeout)) {
218+
close(AMQP.REPLY_SUCCESS, "NACKS RECEIVED", true, null, false);
219+
throw new IOException("nacks received");
220+
}
221+
} catch (TimeoutException e) {
222+
close(AMQP.PRECONDITION_FAILED, "TIMEOUT WAITING FOR ACK");
223+
throw(e);
192224
}
193225
}
194226

@@ -919,8 +951,7 @@ public void basicCancel(final String consumerTag)
919951
throw new IOException("Unknown consumerTag");
920952
BlockingRpcContinuation<Consumer> k = new BlockingRpcContinuation<Consumer>() {
921953
public Consumer transformReply(AMQCommand replyCommand) {
922-
Basic.CancelOk dummy = (Basic.CancelOk) replyCommand.getMethod();
923-
Utility.use(dummy);
954+
replyCommand.getMethod();
924955
_consumers.remove(consumerTag); //may already have been removed
925956
dispatcher.handleCancelOk(originalConsumer, consumerTag);
926957
return originalConsumer;
@@ -930,8 +961,7 @@ public Consumer transformReply(AMQCommand replyCommand) {
930961
rpc(new Basic.Cancel(consumerTag, false), k);
931962

932963
try {
933-
Consumer callback = k.getReply();
934-
Utility.use(callback);
964+
k.getReply(); // discard result
935965
} catch(ShutdownSignalException ex) {
936966
throw wrap(ex);
937967
}

src/com/rabbitmq/utility/Utility.java

Lines changed: 10 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,9 @@ public ThrowableCreatedElsewhere(Throwable throwable) {
3333
this.setStackTrace(throwable.getStackTrace());
3434
}
3535

36-
@Override public Throwable fillInStackTrace(){
37-
return this;
38-
}
36+
@Override public Throwable fillInStackTrace(){
37+
return this;
38+
}
3939
}
4040

4141
public static <T extends Throwable & SensibleClone<T>> T fixStackTrace(T throwable) {
@@ -44,19 +44,19 @@ public static <T extends Throwable & SensibleClone<T>> T fixStackTrace(T throwab
4444
if(throwable.getCause() == null) {
4545
// We'd like to preserve the original stack trace in the cause.
4646
// Unfortunately Java doesn't let you set the cause once it's been
47-
// set once. This means we have to choose between either
47+
// set once. This means we have to choose between either
4848
// - not preserving the type
4949
// - sometimes losing the original stack trace
5050
// - performing nasty reflective voodoo which may or may not work
5151
// We only lose the original stack trace when there's a root cause
52-
// which will hopefully be enlightening enough on its own that it
53-
// doesn't matter too much.
52+
// which will hopefully be enlightening enough on its own that it
53+
// doesn't matter too much.
5454
try {
5555
throwable.initCause(new ThrowableCreatedElsewhere(throwable));
5656
} catch(IllegalStateException e) {
57-
// This exception was explicitly initialised with a null cause.
58-
// Alas this means we can't set the cause even though it has none.
59-
// Thanks.
57+
// This exception was explicitly initialised with a null cause.
58+
// Alas this means we can't set the cause even though it has none.
59+
// Thanks.
6060
}
6161
}
6262

@@ -70,7 +70,7 @@ public static <T extends Throwable & SensibleClone<T>> T fixStackTrace(T throwab
7070
return throwable;
7171
}
7272

73-
73+
7474
public static String makeStackTrace(Throwable throwable) {
7575
ByteArrayOutputStream baOutStream = new ByteArrayOutputStream();
7676
PrintStream printStream = new PrintStream(baOutStream, false);
@@ -80,22 +80,4 @@ public static String makeStackTrace(Throwable throwable) {
8080
printStream.close(); // closes baOutStream
8181
return text;
8282
}
83-
84-
/**
85-
* Used to indicate that we are not, in fact, using a variable, also to silence compiler warnings.
86-
*
87-
* @param value the object we're pretending to use
88-
*/
89-
public static void use(Object value) {
90-
if (value != null) {
91-
return;
92-
}
93-
}
94-
95-
/**
96-
* Similarly, for situations where an empty statement is required
97-
*/
98-
public static void emptyStatement() {
99-
use(null);
100-
}
10183
}

test/src/com/rabbitmq/examples/TestMain.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@
3939
import com.rabbitmq.client.impl.FrameHandler;
4040
import com.rabbitmq.client.impl.SocketFrameHandler;
4141
import com.rabbitmq.utility.BlockingCell;
42-
import com.rabbitmq.utility.Utility;
4342

4443
public class TestMain {
4544
public static void main(String[] args) throws IOException, URISyntaxException {
@@ -199,9 +198,7 @@ public static void runProducerConsumerTest(String uri, int commitEvery)
199198
public static void sleep(int ms) {
200199
try {
201200
Thread.sleep(ms);
202-
} catch (InterruptedException ie) {
203-
Utility.emptyStatement();
204-
}
201+
} catch (InterruptedException _) { } // ignore
205202
}
206203

207204
private Connection _connection;
@@ -232,7 +229,7 @@ public void run() throws IOException {
232229
final int batchSize = 5;
233230

234231
_ch1 = createChannel();
235-
232+
236233
String queueName =_ch1.queueDeclare().getQueue();
237234

238235
sendLotsOfTrivialMessages(batchSize, queueName);
@@ -488,7 +485,7 @@ public void tryBasicReturn() throws IOException {
488485
unsetChannelReturnListener();
489486

490487
log("Completed basic.return testing.");
491-
488+
492489
}
493490

494491
private void unsetChannelReturnListener() {

0 commit comments

Comments
 (0)