Skip to content

Commit 54fcb2c

Browse files
author
Emile Joubert
committed
WaitForConfirms with timeouts
1 parent 0f33c72 commit 54fcb2c

File tree

2 files changed

+50
-2
lines changed

2 files changed

+50
-2
lines changed

src/com/rabbitmq/client/Channel.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import java.io.IOException;
2020
import java.util.Map;
21+
import java.util.concurrent.TimeoutException;
2122

2223
import com.rabbitmq.client.AMQP.BasicProperties;
2324
import com.rabbitmq.client.AMQP.Exchange;
@@ -726,13 +727,31 @@ void basicNack(long deliveryTag, boolean multiple, boolean requeue)
726727
*/
727728
boolean waitForConfirms() throws InterruptedException;
728729

730+
/**
731+
* Wait until all messages published since the last call have been
732+
* either ack'd or nack'd by the broker; or until timeout elapses.
733+
* If the timeout expires a TimeoutException is thrown. When
734+
* called on a non-Confirm channel, waitForConfirms returns true
735+
* immediately.
736+
* @return whether all the messages were ack'd (and none were nack'd)
737+
*/
738+
boolean waitForConfirms(long timeout) throws InterruptedException, TimeoutException;
739+
729740
/** Wait until all messages published since the last call have
730741
* been either ack'd or nack'd by the broker. If any of the
731742
* messages were nack'd, waitForConfirmsOrDie will throw an
732743
* IOException. When called on a non-Confirm channel, it will
733744
* return immediately. */
734745
void waitForConfirmsOrDie() throws IOException, InterruptedException;
735746

747+
/** Wait until all messages published since the last call have
748+
* been either ack'd or nack'd by the broker; or until timeout elapses.
749+
* If the timeout expires a TimeoutException is thrown. If any of the
750+
* messages were nack'd, waitForConfirmsOrDie will throw an
751+
* IOException. When called on a non-Confirm channel, it will
752+
* return immediately. */
753+
void waitForConfirmsOrDie(long timeout) throws IOException, InterruptedException, TimeoutException;
754+
736755
/**
737756
* Asynchronously send a method over this channel.
738757
* @param method method to transmit over this channel.

src/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,17 @@ public void clearConfirmListeners() {
167167
public boolean waitForConfirms()
168168
throws InterruptedException
169169
{
170+
boolean confirms = false;
171+
try {
172+
confirms = waitForConfirms(0L);
173+
} catch (TimeoutException e) { }
174+
return confirms;
175+
}
176+
177+
/** {@inheritDoc} */
178+
public boolean waitForConfirms(long timeout)
179+
throws InterruptedException, TimeoutException {
180+
long startTime = System.currentTimeMillis();
170181
synchronized (unconfirmedSet) {
171182
while (true) {
172183
if (getCloseReason() != null) {
@@ -177,7 +188,16 @@ public boolean waitForConfirms()
177188
onlyAcksReceived = true;
178189
return aux;
179190
}
180-
unconfirmedSet.wait();
191+
if (timeout == 0L) {
192+
unconfirmedSet.wait();
193+
} else {
194+
long elapsed = System.currentTimeMillis() - startTime;
195+
if (timeout > elapsed) {
196+
unconfirmedSet.wait(timeout - elapsed);
197+
} else {
198+
throw new TimeoutException();
199+
}
200+
}
181201
}
182202
}
183203
}
@@ -186,7 +206,16 @@ public boolean waitForConfirms()
186206
public void waitForConfirmsOrDie()
187207
throws IOException, InterruptedException
188208
{
189-
if (!waitForConfirms()) {
209+
try {
210+
waitForConfirmsOrDie(0L);
211+
} catch (TimeoutException e) { }
212+
}
213+
214+
/** {@inheritDoc} */
215+
public void waitForConfirmsOrDie(long timeout)
216+
throws IOException, InterruptedException, TimeoutException
217+
{
218+
if (!waitForConfirms(timeout)) {
190219
close(AMQP.REPLY_SUCCESS, "NACKS RECEIVED", true, null, false);
191220
throw new IOException("nacks received");
192221
}

0 commit comments

Comments
 (0)