Skip to content

Commit 98b5c93

Browse files
author
Alexandru Scvortov
committed
merge bug 24538 into default (WaitForConfirms with timeouts)
2 parents becedd1 + 0d0159c commit 98b5c93

File tree

6 files changed

+163
-143
lines changed

6 files changed

+163
-143
lines changed

src/com/rabbitmq/client/Channel.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -639,7 +639,7 @@ void basicNack(long deliveryTag, boolean multiple, boolean requeue)
639639
* Cancel a consumer. Calls the consumer's {@link Consumer#handleCancelOk}
640640
* method.
641641
* @param consumerTag a client- or server-generated consumer tag to establish context
642-
* @throws java.io.IOException if an error is encountered
642+
* @throws IOException if an error is encountered, or if the consumerTag is unknown
643643
* @see com.rabbitmq.client.AMQP.Basic.Cancel
644644
* @see com.rabbitmq.client.AMQP.Basic.CancelOk
645645
*/
@@ -650,12 +650,12 @@ void basicNack(long deliveryTag, boolean multiple, boolean requeue)
650650
* basic.recover is asynchronous; in 0-9-1 it is synchronous, and
651651
* the new, deprecated method basic.recover_async is asynchronous.
652652
* <p/>
653-
* Equivalent to calling <code>basicRecover(true)</code>, messages
654-
* will be requeued and possibly delivered to a different consumer.
653+
* Equivalent to calling <code>basicRecover(true)</code>, messages
654+
* will be requeued and possibly delivered to a different consumer.
655655
* @see #basicRecover(boolean)
656656
*/
657657
Basic.RecoverOk basicRecover() throws IOException;
658-
658+
659659
/**
660660
* Ask the broker to resend unacknowledged messages. In 0-8
661661
* basic.recover is asynchronous; in 0-9-1 it is synchronous, and

src/com/rabbitmq/client/impl/AMQConnection.java

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -578,9 +578,7 @@ public boolean processControlCommand(Command c) throws IOException
578578
// Already shutting down, so just send back a CloseOk.
579579
try {
580580
_channel0.quiescingTransmit(new AMQP.Connection.CloseOk.Builder().build());
581-
} catch (IOException ioe) {
582-
Utility.emptyStatement();
583-
}
581+
} catch (IOException _) { } // ignore
584582
return true;
585583
} else if (method instanceof AMQP.Connection.CloseOk) {
586584
// It's our final "RPC". Time to shut down.
@@ -599,9 +597,7 @@ public void handleConnectionClose(Command closeCommand) {
599597
ShutdownSignalException sse = shutdown(closeCommand, false, null, false);
600598
try {
601599
_channel0.quiescingTransmit(new AMQP.Connection.CloseOk.Builder().build());
602-
} catch (IOException ioe) {
603-
Utility.emptyStatement();
604-
}
600+
} catch (IOException _) { } // ignore
605601
_brokerInitiatedShutdown = true;
606602
Thread scw = new SocketCloseWait(sse);
607603
scw.setName("AMQP Connection Closing Monitor " +
@@ -708,9 +704,7 @@ public void abort(int closeCode, String closeMessage, int timeout)
708704
{
709705
try {
710706
close(closeCode, closeMessage, true, null, timeout, true);
711-
} catch (IOException e) {
712-
Utility.emptyStatement();
713-
}
707+
} catch (IOException _) { } // ignore
714708
}
715709

716710
/**

src/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -121,10 +121,8 @@ public ChannelN(AMQConnection connection, int channelNumber,
121121
* @throws IOException if any problem is encountered
122122
*/
123123
public void open() throws IOException {
124-
// wait for the Channel.OpenOk response, then ignore it
125-
Channel.OpenOk openOk =
126-
(Channel.OpenOk) exnWrappingRpc(new Channel.Open(UNSPECIFIED_OUT_OF_BAND)).getMethod();
127-
Utility.use(openOk);
124+
// wait for the Channel.OpenOk response, and ignore it
125+
exnWrappingRpc(new Channel.Open(UNSPECIFIED_OUT_OF_BAND));
128126
}
129127

130128
public void addReturnListener(ReturnListener listener) {
@@ -267,7 +265,7 @@ private CountDownLatch broadcastShutdownSignal(ShutdownSignalException signal) {
267265
}
268266
}
269267

270-
public CountDownLatch getShutdownLatch() {
268+
CountDownLatch getShutdownLatch() {
271269
return this.finishedShutdownFlag;
272270
}
273271

@@ -421,7 +419,7 @@ private void callReturnListeners(Command command, Basic.Return basicReturn) {
421419
}
422420
}
423421

424-
private void callFlowListeners(Command command, Channel.Flow channelFlow) {
422+
private void callFlowListeners(@SuppressWarnings("unused") Command command, Channel.Flow channelFlow) {
425423
try {
426424
for (FlowListener l : this.flowListeners) {
427425
l.handleFlow(channelFlow.getActive());
@@ -431,7 +429,7 @@ private void callFlowListeners(Command command, Channel.Flow channelFlow) {
431429
}
432430
}
433431

434-
private void callConfirmListeners(Command command, Basic.Ack ack) {
432+
private void callConfirmListeners(@SuppressWarnings("unused") Command command, Basic.Ack ack) {
435433
try {
436434
for (ConfirmListener l : this.confirmListeners) {
437435
l.handleAck(ack.getDeliveryTag(), ack.getMultiple());
@@ -441,7 +439,7 @@ private void callConfirmListeners(Command command, Basic.Ack ack) {
441439
}
442440
}
443441

444-
private void callConfirmListeners(Command command, Basic.Nack nack) {
442+
private void callConfirmListeners(@SuppressWarnings("unused") Command command, Basic.Nack nack) {
445443
try {
446444
for (ConfirmListener l : this.confirmListeners) {
447445
l.handleNack(nack.getDeliveryTag(), nack.getMultiple());
@@ -496,10 +494,17 @@ public void abort(int closeCode, String closeMessage)
496494
close(closeCode, closeMessage, true, null, true);
497495
}
498496

497+
// TODO: method should be private
499498
/**
500499
* Protected API - Close channel with code and message, indicating
501500
* the source of the closure and a causing exception (null if
502501
* none).
502+
* @param closeCode the close code (See under "Reply Codes" in the AMQP specification)
503+
* @param closeMessage a message indicating the reason for closing the connection
504+
* @param initiatedByApplication true if this comes from an API call, false otherwise
505+
* @param cause exception triggering close
506+
* @param abort true if we should close and ignore errors
507+
* @throws IOException if an error is encountered
503508
*/
504509
public void close(int closeCode,
505510
String closeMessage,
@@ -941,23 +946,22 @@ public String transformReply(AMQCommand replyCommand) {
941946
public void basicCancel(final String consumerTag)
942947
throws IOException
943948
{
949+
final Consumer originalConsumer = _consumers.get(consumerTag);
950+
if (originalConsumer == null)
951+
throw new IOException("Unknown consumerTag");
944952
BlockingRpcContinuation<Consumer> k = new BlockingRpcContinuation<Consumer>() {
945953
public Consumer transformReply(AMQCommand replyCommand) {
946-
Basic.CancelOk dummy = (Basic.CancelOk) replyCommand.getMethod();
947-
Utility.use(dummy);
948-
Consumer callback = _consumers.remove(consumerTag);
949-
// We need to call back inside the connection thread
950-
// in order avoid races with 'deliver' commands
951-
dispatcher.handleCancelOk(callback, consumerTag);
952-
return callback;
954+
replyCommand.getMethod();
955+
_consumers.remove(consumerTag); //may already have been removed
956+
dispatcher.handleCancelOk(originalConsumer, consumerTag);
957+
return originalConsumer;
953958
}
954959
};
955960

956961
rpc(new Basic.Cancel(consumerTag, false), k);
957962

958963
try {
959-
Consumer callback = k.getReply();
960-
Utility.use(callback);
964+
k.getReply(); // discard result
961965
} catch(ShutdownSignalException ex) {
962966
throw wrap(ex);
963967
}

src/com/rabbitmq/utility/Utility.java

Lines changed: 10 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,9 @@ public ThrowableCreatedElsewhere(Throwable throwable) {
3333
this.setStackTrace(throwable.getStackTrace());
3434
}
3535

36-
@Override public Throwable fillInStackTrace(){
37-
return this;
38-
}
36+
@Override public Throwable fillInStackTrace(){
37+
return this;
38+
}
3939
}
4040

4141
public static <T extends Throwable & SensibleClone<T>> T fixStackTrace(T throwable) {
@@ -44,19 +44,19 @@ public static <T extends Throwable & SensibleClone<T>> T fixStackTrace(T throwab
4444
if(throwable.getCause() == null) {
4545
// We'd like to preserve the original stack trace in the cause.
4646
// Unfortunately Java doesn't let you set the cause once it's been
47-
// set once. This means we have to choose between either
47+
// set once. This means we have to choose between either
4848
// - not preserving the type
4949
// - sometimes losing the original stack trace
5050
// - performing nasty reflective voodoo which may or may not work
5151
// We only lose the original stack trace when there's a root cause
52-
// which will hopefully be enlightening enough on its own that it
53-
// doesn't matter too much.
52+
// which will hopefully be enlightening enough on its own that it
53+
// doesn't matter too much.
5454
try {
5555
throwable.initCause(new ThrowableCreatedElsewhere(throwable));
5656
} catch(IllegalStateException e) {
57-
// This exception was explicitly initialised with a null cause.
58-
// Alas this means we can't set the cause even though it has none.
59-
// Thanks.
57+
// This exception was explicitly initialised with a null cause.
58+
// Alas this means we can't set the cause even though it has none.
59+
// Thanks.
6060
}
6161
}
6262

@@ -70,7 +70,7 @@ public static <T extends Throwable & SensibleClone<T>> T fixStackTrace(T throwab
7070
return throwable;
7171
}
7272

73-
73+
7474
public static String makeStackTrace(Throwable throwable) {
7575
ByteArrayOutputStream baOutStream = new ByteArrayOutputStream();
7676
PrintStream printStream = new PrintStream(baOutStream, false);
@@ -80,22 +80,4 @@ public static String makeStackTrace(Throwable throwable) {
8080
printStream.close(); // closes baOutStream
8181
return text;
8282
}
83-
84-
/**
85-
* Used to indicate that we are not, in fact, using a variable, also to silence compiler warnings.
86-
*
87-
* @param value the object we're pretending to use
88-
*/
89-
public static void use(Object value) {
90-
if (value != null) {
91-
return;
92-
}
93-
}
94-
95-
/**
96-
* Similarly, for situations where an empty statement is required
97-
*/
98-
public static void emptyStatement() {
99-
use(null);
100-
}
10183
}

0 commit comments

Comments
 (0)