Skip to content

Commit 349d7a6

Browse files
author
Alexandru Scvortov
committed
add waitForConfirmsOrDie
1 parent 9134d15 commit 349d7a6

File tree

2 files changed

+27
-2
lines changed

2 files changed

+27
-2
lines changed

src/com/rabbitmq/client/Channel.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -703,7 +703,14 @@ void basicNack(long deliveryTag, boolean multiple, boolean requeue)
703703
* non-Confirm channel, waitForConfirms returns true immediately.
704704
* @return whether all the messages were ack'd (and none were nack'd)
705705
*/
706-
boolean waitForConfirms() throws InterruptedException;
706+
boolean waitForConfirms() throws IOException, InterruptedException;
707+
708+
/** Wait untill all messages published since the last call have
709+
* been either ack'd or nack'd by the broker. If any of the
710+
* messages were nack'd, waitForConfirmsOrDie will throw an
711+
* IOException. When called on a non-Confirm channel, it will
712+
* return immediately. */
713+
void waitForConfirmsOrDie() throws IOException, InterruptedException;
707714

708715
/**
709716
* Asynchronously send a method over this channel.

src/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,13 +188,31 @@ public ConfirmListener getConfirmListener() {
188188

189189
/** {@inheritDoc} */
190190
public boolean waitForConfirms()
191-
throws InterruptedException
191+
throws IOException, InterruptedException
192+
{
193+
return waitForConfirmsInternal(false);
194+
}
195+
196+
/** {@inheritDoc} */
197+
public void waitForConfirmsOrDie()
198+
throws IOException, InterruptedException
199+
{
200+
waitForConfirmsInternal(true);
201+
}
202+
203+
protected boolean waitForConfirmsInternal(boolean dieOnNack)
204+
throws IOException, InterruptedException
192205
{
193206
synchronized (unconfirmedSet) {
194207
while (true) {
195208
if (getCloseReason() != null) {
196209
throw Utility.fixStackTrace(getCloseReason());
197210
}
211+
if (dieOnNack && nacksReceived) {
212+
close(AMQP.REPLY_SUCCESS, "OK", true,
213+
new RuntimeException("received nack"), false);
214+
throw Utility.fixStackTrace(getCloseReason());
215+
}
198216
if (unconfirmedSet.isEmpty()) {
199217
boolean noNacksReceived = !nacksReceived;
200218
nacksReceived = false;

0 commit comments

Comments
 (0)