Skip to content

Commit fabe315

Browse files
author
Steve Powell
committed
Remove GRATUITOUS_WAIT and add more aggressive Requeue/Consumer tests.
1 parent 79d27e5 commit fabe315

File tree

4 files changed

+134
-33
lines changed

4 files changed

+134
-33
lines changed

src/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -968,14 +968,14 @@ public Consumer transformReply(AMQCommand replyCommand) {
968968
}
969969

970970

971-
/** Public API - {@inheritDoc} */
971+
/** Public API - {@inheritDoc} */
972972
public Basic.RecoverOk basicRecover()
973973
throws IOException
974974
{
975975
return basicRecover(true);
976976
}
977977

978-
/** Public API - {@inheritDoc} */
978+
/** Public API - {@inheritDoc} */
979979
public Basic.RecoverOk basicRecover(boolean requeue)
980980
throws IOException
981981
{

src/com/rabbitmq/client/impl/ConsumerDispatcher.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ final class ConsumerDispatcher {
4545
private volatile boolean shutdownConsumersDriven = false;
4646
private volatile CountDownLatch shutdownConsumersComplete;
4747

48-
private volatile ShutdownSignalException shutdownSignal;
48+
private volatile ShutdownSignalException shutdownSignal = null;
4949

5050
public ConsumerDispatcher(AMQConnection connection,
5151
Channel channel,

src/com/rabbitmq/client/impl/ConsumerWorkService.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,9 @@ final class ConsumerWorkService {
3030
private final WorkPool<Channel, Runnable> workPool;
3131

3232
public ConsumerWorkService(ExecutorService executor) {
33-
if (executor == null) {
34-
privateExecutor = true;
35-
this.executor = Executors.newFixedThreadPool(DEFAULT_NUM_THREADS);
36-
} else {
37-
privateExecutor = false;
38-
this.executor = executor;
39-
}
33+
this.privateExecutor = (executor == null);
34+
this.executor = (executor == null) ? Executors.newFixedThreadPool(DEFAULT_NUM_THREADS)
35+
: executor;
4036
this.workPool = new WorkPool<Channel, Runnable>();
4137
}
4238

test/src/com/rabbitmq/client/test/functional/RequeueOnClose.java

Lines changed: 128 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,24 @@
1818

1919
import com.rabbitmq.client.test.BrokerTestCase;
2020
import java.io.IOException;
21+
import java.util.concurrent.CountDownLatch;
2122

23+
import com.rabbitmq.client.AMQP;
24+
import com.rabbitmq.client.Channel;
25+
import com.rabbitmq.client.DefaultConsumer;
26+
import com.rabbitmq.client.Envelope;
2227
import com.rabbitmq.client.GetResponse;
2328
import com.rabbitmq.client.QueueingConsumer;
2429
import com.rabbitmq.client.ShutdownSignalException;
2530

31+
/**
32+
* Test Requeue of messages on different types of close.
33+
* Methods {@link #open} and {@link #close} must be implemented by a concrete subclass.
34+
*/
2635
public abstract class RequeueOnClose
2736
extends BrokerTestCase
2837
{
2938
private static final String Q = "RequeueOnClose";
30-
private static final int GRATUITOUS_DELAY = 100;
3139
private static final int MESSAGE_COUNT = 2000;
3240

3341
protected abstract void open() throws IOException;
@@ -46,7 +54,7 @@ protected void tearDown()
4654
// Override to disable the default behaviour from BrokerTestCase.
4755
}
4856

49-
public void injectMessage()
57+
private void injectMessage()
5058
throws IOException
5159
{
5260
channel.queueDeclare(Q, false, false, false, null);
@@ -55,13 +63,13 @@ public void injectMessage()
5563
channel.basicPublish("", Q, null, "RequeueOnClose message".getBytes());
5664
}
5765

58-
public GetResponse getMessage()
66+
private GetResponse getMessage()
5967
throws IOException
6068
{
6169
return channel.basicGet(Q, false);
6270
}
6371

64-
public void publishAndGet(int count, boolean doAck)
72+
private void publishAndGet(int count, boolean doAck)
6573
throws IOException, InterruptedException
6674
{
6775
openConnection();
@@ -71,33 +79,40 @@ public void publishAndGet(int count, boolean doAck)
7179
GetResponse r1 = getMessage();
7280
if (doAck) channel.basicAck(r1.getEnvelope().getDeliveryTag(), false);
7381
close();
74-
Thread.sleep(GRATUITOUS_DELAY);
7582
open();
76-
GetResponse r2 = getMessage();
77-
if (doAck && r2 != null) {
78-
fail("Expected missing second basicGet (repeat="+repeat+")");
79-
} else if (!doAck && r2 == null) {
80-
fail("Expected present second basicGet (repeat="+repeat+")");
83+
if (doAck) {
84+
assertNull("Expected missing second basicGet (repeat="+repeat+")", getMessage());
85+
} else {
86+
assertNotNull("Expected present second basicGet (repeat="+repeat+")", getMessage());
8187
}
8288
close();
8389
}
8490
closeConnection();
8591
}
8692

87-
public void testNormal()
88-
throws IOException, InterruptedException
93+
/**
94+
* Test we don't requeue acknowledged messages (using get)
95+
* @throws Exception test
96+
*/
97+
public void testNormal() throws Exception
8998
{
9099
publishAndGet(3, true);
91100
}
92101

93-
public void testRequeueing()
94-
throws IOException, InterruptedException
102+
/**
103+
* Test we requeue unacknowledged messages (using get)
104+
* @throws Exception test
105+
*/
106+
public void testRequeueing() throws Exception
95107
{
96108
publishAndGet(3, false);
97109
}
98110

99-
public void testRequeueingConsumer()
100-
throws IOException, InterruptedException, ShutdownSignalException
111+
/**
112+
* Test we requeue unacknowledged message (using consumer)
113+
* @throws Exception test
114+
*/
115+
public void testRequeueingConsumer() throws Exception
101116
{
102117
openConnection();
103118
open();
@@ -106,14 +121,13 @@ public void testRequeueingConsumer()
106121
channel.basicConsume(Q, c);
107122
c.nextDelivery();
108123
close();
109-
Thread.sleep(GRATUITOUS_DELAY);
110124
open();
111125
assertNotNull(getMessage());
112126
close();
113127
closeConnection();
114128
}
115129

116-
public void publishLotsAndGet()
130+
private void publishLotsAndGet()
117131
throws IOException, InterruptedException, ShutdownSignalException
118132
{
119133
openConnection();
@@ -130,22 +144,113 @@ public void publishLotsAndGet()
130144
close();
131145
open();
132146
for (int i = 0; i < MESSAGE_COUNT; i++) {
133-
GetResponse r = channel.basicGet(Q, true);
134147
assertNotNull("only got " + i + " out of " + MESSAGE_COUNT +
135-
" messages", r);
148+
" messages", channel.basicGet(Q, true));
136149
}
137-
assertNull(channel.basicGet(Q, true));
150+
assertNull("got more messages than " + MESSAGE_COUNT + " expected", channel.basicGet(Q, true));
138151
channel.queueDelete(Q);
139152
close();
140153
closeConnection();
141154
}
142155

143-
public void testRequeueInFlight()
144-
throws IOException, InterruptedException, ShutdownSignalException
156+
/**
157+
* Test close while consuming many messages successfully requeues unacknowledged messages
158+
* @throws Exception test
159+
*/
160+
public void testRequeueInFlight() throws Exception
145161
{
146162
for (int i = 0; i < 5; i++) {
147163
publishLotsAndGet();
148164
}
149165
}
150166

167+
/**
168+
* Test close while consuming partially not acked with cancel successfully requeues unacknowledged messages
169+
* @throws Exception test
170+
*/
171+
public void testRequeueInFlightConsumerNoAck() throws Exception
172+
{
173+
for (int i = 0; i < 5; i++) {
174+
publishLotsAndConsumeSome(false);
175+
}
176+
}
177+
178+
/**
179+
* Test close while consuming partially acked with cancel successfully requeues unacknowledged messages
180+
* @throws Exception test
181+
*/
182+
public void testRequeueInFlightConsumerAck() throws Exception
183+
{
184+
for (int i = 0; i < 5; i++) {
185+
publishLotsAndConsumeSome(true);
186+
}
187+
}
188+
189+
private static final int MESSAGES_TO_CONSUME = 20;
190+
191+
private void publishLotsAndConsumeSome(boolean ack)
192+
throws IOException, InterruptedException, ShutdownSignalException
193+
{
194+
openConnection();
195+
open();
196+
channel.queueDeclare(Q, false, false, false, null);
197+
channel.queueDelete(Q);
198+
channel.queueDeclare(Q, false, false, false, null);
199+
for (int i = 0; i < MESSAGE_COUNT; i++) {
200+
channel.basicPublish("", Q, null, "in flight message".getBytes());
201+
}
202+
203+
CountDownLatch latch = new CountDownLatch(1);
204+
PartialConsumer c = new PartialConsumer(channel, MESSAGES_TO_CONSUME, ack, latch);
205+
channel.basicConsume(Q, c);
206+
latch.await(); // wait for consumer
207+
208+
close();
209+
open();
210+
int requeuedMsgCount = (ack) ? MESSAGE_COUNT - MESSAGES_TO_CONSUME : MESSAGE_COUNT;
211+
for (int i = 0; i < requeuedMsgCount; i++) {
212+
assertNotNull("only got " + i + " out of " + requeuedMsgCount + " messages",
213+
channel.basicGet(Q, true));
214+
}
215+
int countMoreMsgs = 0;
216+
while (null != channel.basicGet(Q, true)) {
217+
countMoreMsgs++;
218+
}
219+
assertTrue("got " + countMoreMsgs + " more messages than " + requeuedMsgCount + " expected", 0==countMoreMsgs);
220+
channel.queueDelete(Q);
221+
close();
222+
closeConnection();
223+
}
224+
225+
private class PartialConsumer extends DefaultConsumer {
226+
227+
private volatile int count;
228+
private Channel channel;
229+
private CountDownLatch latch;
230+
private volatile boolean acknowledge;
231+
232+
public PartialConsumer(Channel channel, int count, boolean acknowledge, CountDownLatch latch) {
233+
super(channel);
234+
this.count = count;
235+
this.channel = channel;
236+
this.latch = latch;
237+
this.acknowledge = acknowledge;
238+
}
239+
240+
@Override
241+
public void handleDelivery(String consumerTag,
242+
Envelope envelope,
243+
AMQP.BasicProperties properties,
244+
byte[] body)
245+
throws IOException
246+
{
247+
if (this.acknowledge)
248+
this.channel.basicAck(envelope.getDeliveryTag(), false);
249+
if (--this.count == 0) {
250+
this.channel.basicCancel(this.getConsumerTag());
251+
this.acknowledge = false; // don't acknowledge any more
252+
this.latch.countDown();
253+
}
254+
}
255+
}
151256
}

0 commit comments

Comments
 (0)