Skip to content

Commit 11cf3e2

Browse files
author
Alexandru Scvortov
committed
simpler code and update tests
1 parent 8f89a76 commit 11cf3e2

File tree

3 files changed

+22
-35
lines changed

3 files changed

+22
-35
lines changed

src/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 11 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -189,30 +189,12 @@ public ConfirmListener getConfirmListener() {
189189
/** {@inheritDoc} */
190190
public boolean waitForConfirms()
191191
throws IOException, InterruptedException
192-
{
193-
return waitForConfirmsInternal(false);
194-
}
195-
196-
/** {@inheritDoc} */
197-
public void waitForConfirmsOrDie()
198-
throws IOException, InterruptedException
199-
{
200-
waitForConfirmsInternal(true);
201-
}
202-
203-
protected boolean waitForConfirmsInternal(boolean dieOnNack)
204-
throws IOException, InterruptedException
205192
{
206193
synchronized (unconfirmedSet) {
207194
while (true) {
208195
if (getCloseReason() != null) {
209196
throw Utility.fixStackTrace(getCloseReason());
210197
}
211-
if (dieOnNack && nacksReceived) {
212-
close(AMQP.REPLY_SUCCESS, "OK", true,
213-
new RuntimeException("received nack"), false);
214-
throw Utility.fixStackTrace(getCloseReason());
215-
}
216198
if (unconfirmedSet.isEmpty()) {
217199
boolean noNacksReceived = !nacksReceived;
218200
nacksReceived = false;
@@ -223,6 +205,17 @@ protected boolean waitForConfirmsInternal(boolean dieOnNack)
223205
}
224206
}
225207

208+
/** {@inheritDoc} */
209+
public void waitForConfirmsOrDie()
210+
throws IOException, InterruptedException
211+
{
212+
if (!waitForConfirms()) {
213+
close(AMQP.REPLY_SUCCESS, "OK", true,
214+
new RuntimeException("received nack"), false);
215+
throw Utility.fixStackTrace(getCloseReason());
216+
}
217+
}
218+
226219
/**
227220
* Sets the current {@link ConfirmListener}.
228221
* A null argument is interpreted to mean "do not use a confirm listener".

test/src/com/rabbitmq/client/test/functional/Confirm.java

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ public void testConfirmQueueDelete()
122122

123123
channel.queueDelete("confirm-test-noconsumer");
124124

125-
waitAcks();
125+
channel.waitForConfirmsOrDie();
126126
}
127127

128128
public void testConfirmQueuePurge()
@@ -132,23 +132,23 @@ public void testConfirmQueuePurge()
132132

133133
channel.queuePurge("confirm-test-noconsumer");
134134

135-
waitAcks();
135+
channel.waitForConfirmsOrDie();
136136
}
137137

138138
public void testConfirmBasicReject()
139139
throws IOException, InterruptedException
140140
{
141141
basicRejectCommon(false);
142142

143-
waitAcks();
143+
channel.waitForConfirmsOrDie();
144144
}
145145

146146
public void testConfirmQueueTTL()
147147
throws IOException, InterruptedException
148148
{
149149
publishN("", "confirm-ttl", true, false, false);
150150

151-
waitAcks();
151+
channel.waitForConfirmsOrDie();
152152
}
153153

154154
public void testConfirmBasicRejectRequeue()
@@ -162,7 +162,7 @@ public void testConfirmBasicRejectRequeue()
162162
channel.basicConsume("confirm-test-noconsumer", true,
163163
new DefaultConsumer(channel));
164164

165-
waitAcks();
165+
channel.waitForConfirmsOrDie();
166166
}
167167

168168
public void testConfirmBasicRecover()
@@ -184,7 +184,7 @@ public void testConfirmBasicRecover()
184184
channel.basicConsume("confirm-test-noconsumer", true,
185185
new DefaultConsumer(channel));
186186

187-
waitAcks();
187+
channel.waitForConfirmsOrDie();
188188
}
189189

190190
public void testConfirmSelect()
@@ -238,7 +238,7 @@ public void handleNack(long seqNo, boolean multiple) {
238238
publish("", "confirm-test", true, false, false);
239239
}
240240

241-
waitAcks();
241+
channel.waitForConfirmsOrDie();
242242
if (!unconfirmedSet.isEmpty()) {
243243
fail("waitForConfirms returned with unconfirmed messages");
244244
}
@@ -250,7 +250,7 @@ public void testWaitForConfirmsNoOp()
250250
channel = connection.createChannel();
251251
// Don't enable Confirm mode
252252
publish("", "confirm-test", true, false, false);
253-
waitAcks(); // Nop
253+
channel.waitForConfirmsOrDie(); // Nop
254254
}
255255

256256
public void testWaitForConfirmsException()
@@ -259,7 +259,7 @@ public void testWaitForConfirmsException()
259259
publishN("", "confirm-test", true, false, false);
260260
channel.close();
261261
try {
262-
waitAcks();
262+
channel.waitForConfirmsOrDie();
263263
fail("waitAcks worked on a closed channel");
264264
} catch (Exception e) {
265265
//whoosh; everything ok
@@ -274,7 +274,7 @@ public void confirmTest(String exchange, String queueName,
274274
{
275275
publishN(exchange, queueName, persistent, mandatory, immediate);
276276

277-
waitAcks();
277+
channel.waitForConfirmsOrDie();
278278
}
279279

280280
private void publishN(String exchangeName, String queueName,
@@ -309,9 +309,4 @@ protected void publish(String exchangeName, String queueName,
309309
: MessageProperties.BASIC,
310310
"nop".getBytes());
311311
}
312-
313-
protected void waitAcks() throws IOException, InterruptedException {
314-
if (!channel.waitForConfirms())
315-
fail("got nacks");
316-
}
317312
}

test/src/com/rabbitmq/client/test/server/MessageRecovery.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,7 @@ public void test() throws IOException, InterruptedException {
3131
channel.basicPublish("", Q, false, false,
3232
MessageProperties.PERSISTENT_BASIC,
3333
"nop".getBytes());
34-
if (!channel.waitForConfirms())
35-
fail("received nacks");
34+
channel.waitForConfirmsOrDie();
3635

3736
restart();
3837
assertDelivered(Q, 1);

0 commit comments

Comments
 (0)