Skip to content

Commit 077f66f

Browse files
committed
Remove some QueueingConsumer usages in tests
References #213
1 parent 415b5eb commit 077f66f

File tree

5 files changed

+119
-74
lines changed

5 files changed

+119
-74
lines changed

src/test/java/com/rabbitmq/client/test/ClientTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
Bug20004Test.class,
3434
CloseInMainLoop.class,
3535
ChannelNumberAllocationTests.class,
36-
QueueingConsumerShutdownTests.class,
36+
QueueingConsumerTests.class,
3737
MultiThreadedChannel.class,
3838
IntAllocatorTests.class,
3939
AMQBuilderApiTest.class,

src/test/java/com/rabbitmq/client/test/QueueingConsumerShutdownTests.java renamed to src/test/java/com/rabbitmq/client/test/QueueingConsumerTests.java

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,13 @@
1616
package com.rabbitmq.client.test;
1717

1818
import static org.junit.Assert.*;
19-
import org.junit.Test;
2019

20+
import com.rabbitmq.client.ConsumerCancelledException;
21+
import org.junit.Test;
2122

23+
import java.io.IOException;
24+
import java.util.concurrent.ArrayBlockingQueue;
25+
import java.util.concurrent.BlockingQueue;
2226
import java.util.concurrent.CountDownLatch;
2327
import java.util.concurrent.TimeUnit;
2428
import java.util.concurrent.atomic.AtomicInteger;
@@ -27,7 +31,7 @@
2731
import com.rabbitmq.client.QueueingConsumer;
2832
import com.rabbitmq.client.ShutdownSignalException;
2933

30-
public class QueueingConsumerShutdownTests extends BrokerTestCase{
34+
public class QueueingConsumerTests extends BrokerTestCase{
3135
static final String QUEUE = "some-queue";
3236
static final int THREADS = 5;
3337

@@ -64,4 +68,36 @@ public class QueueingConsumerShutdownTests extends BrokerTestCase{
6468
assertEquals(0, count.get());
6569
}
6670

71+
@Test public void consumerCancellationInterruptsQueuingConsumerWait()
72+
throws IOException, InterruptedException {
73+
String queue = "cancel_notification_queue_for_queueing_consumer";
74+
final BlockingQueue<Boolean> result = new ArrayBlockingQueue<Boolean>(1);
75+
channel.queueDeclare(queue, false, true, false, null);
76+
final QueueingConsumer consumer = new QueueingConsumer(channel);
77+
Runnable receiver = new Runnable() {
78+
79+
public void run() {
80+
try {
81+
try {
82+
consumer.nextDelivery();
83+
} catch (ConsumerCancelledException e) {
84+
result.put(true);
85+
return;
86+
} catch (ShutdownSignalException e) {
87+
} catch (InterruptedException e) {
88+
}
89+
result.put(false);
90+
} catch (InterruptedException e) {
91+
fail();
92+
}
93+
}
94+
};
95+
Thread t = new Thread(receiver);
96+
t.start();
97+
channel.basicConsume(queue, consumer);
98+
channel.queueDelete(queue);
99+
assertTrue(result.take());
100+
t.join();
101+
}
102+
67103
}

src/test/java/com/rabbitmq/client/test/functional/ConsumerCancelNotification.java

Lines changed: 9 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -15,24 +15,21 @@
1515

1616
package com.rabbitmq.client.test.functional;
1717

18-
import static org.junit.Assert.assertTrue;
19-
import static org.junit.Assert.fail;
18+
import com.rabbitmq.client.Channel;
19+
import com.rabbitmq.client.Consumer;
20+
import com.rabbitmq.client.DefaultConsumer;
21+
import com.rabbitmq.client.ShutdownSignalException;
22+
import com.rabbitmq.client.test.BrokerTestCase;
23+
import org.junit.Test;
2024

2125
import java.io.IOException;
2226
import java.util.concurrent.ArrayBlockingQueue;
2327
import java.util.concurrent.BlockingQueue;
2428
import java.util.concurrent.CountDownLatch;
2529
import java.util.concurrent.TimeUnit;
2630

27-
import org.junit.Test;
28-
29-
import com.rabbitmq.client.Channel;
30-
import com.rabbitmq.client.Consumer;
31-
import com.rabbitmq.client.ConsumerCancelledException;
32-
import com.rabbitmq.client.DefaultConsumer;
33-
import com.rabbitmq.client.QueueingConsumer;
34-
import com.rabbitmq.client.ShutdownSignalException;
35-
import com.rabbitmq.client.test.BrokerTestCase;
31+
import static org.junit.Assert.assertTrue;
32+
import static org.junit.Assert.fail;
3633

3734
public class ConsumerCancelNotification extends BrokerTestCase {
3835

@@ -43,7 +40,7 @@ public class ConsumerCancelNotification extends BrokerTestCase {
4340
final BlockingQueue<Boolean> result = new ArrayBlockingQueue<Boolean>(1);
4441

4542
channel.queueDeclare(queue, false, true, false, null);
46-
Consumer consumer = new QueueingConsumer(channel) {
43+
Consumer consumer = new DefaultConsumer(channel) {
4744
@Override
4845
public void handleCancel(String consumerTag) throws IOException {
4946
try {
@@ -58,38 +55,6 @@ public void handleCancel(String consumerTag) throws IOException {
5855
assertTrue(result.take());
5956
}
6057

61-
@Test public void consumerCancellationInterruptsQueuingConsumerWait()
62-
throws IOException, InterruptedException {
63-
final BlockingQueue<Boolean> result = new ArrayBlockingQueue<Boolean>(1);
64-
channel.queueDeclare(queue, false, true, false, null);
65-
final QueueingConsumer consumer = new QueueingConsumer(channel);
66-
Runnable receiver = new Runnable() {
67-
68-
public void run() {
69-
try {
70-
try {
71-
consumer.nextDelivery();
72-
} catch (ConsumerCancelledException e) {
73-
result.put(true);
74-
return;
75-
} catch (ShutdownSignalException e) {
76-
} catch (InterruptedException e) {
77-
}
78-
result.put(false);
79-
} catch (InterruptedException e) {
80-
fail();
81-
}
82-
}
83-
};
84-
Thread t = new Thread(receiver);
85-
t.start();
86-
channel.basicConsume(queue, consumer);
87-
channel.queueDelete(queue);
88-
assertTrue(result.take());
89-
t.join();
90-
}
91-
92-
9358
class AlteringConsumer extends DefaultConsumer {
9459
private final String altQueue;
9560
private final CountDownLatch latch;

src/test/java/com/rabbitmq/client/test/functional/ConsumerPriorities.java

Lines changed: 40 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -15,21 +15,20 @@
1515

1616
package com.rabbitmq.client.test.functional;
1717

18-
import static org.junit.Assert.assertEquals;
19-
import static org.junit.Assert.fail;
18+
import com.rabbitmq.client.*;
19+
import com.rabbitmq.client.test.BrokerTestCase;
20+
import org.junit.Test;
2021

2122
import java.io.IOException;
2223
import java.util.Arrays;
2324
import java.util.HashMap;
2425
import java.util.Map;
26+
import java.util.concurrent.BlockingQueue;
27+
import java.util.concurrent.LinkedBlockingQueue;
28+
import java.util.concurrent.TimeUnit;
2529

26-
import org.junit.Test;
27-
28-
import com.rabbitmq.client.AMQP;
29-
import com.rabbitmq.client.Channel;
30-
import com.rabbitmq.client.MessageProperties;
31-
import com.rabbitmq.client.QueueingConsumer;
32-
import com.rabbitmq.client.test.BrokerTestCase;
30+
import static org.junit.Assert.assertEquals;
31+
import static org.junit.Assert.fail;
3332

3433
public class ConsumerPriorities extends BrokerTestCase {
3534
@Test public void validation() throws IOException {
@@ -43,7 +42,7 @@ private void assertFailValidation(Map<String, Object> args) throws IOException {
4342
Channel ch = connection.createChannel();
4443
String queue = ch.queueDeclare().getQueue();
4544
try {
46-
ch.basicConsume(queue, true, args, new QueueingConsumer(ch));
45+
ch.basicConsume(queue, true, args, new DefaultConsumer(ch));
4746
fail("Validation should fail for " + args);
4847
} catch (IOException ioe) {
4948
checkShutdownSignal(AMQP.PRECONDITION_FAILED, ioe);
@@ -54,9 +53,9 @@ private void assertFailValidation(Map<String, Object> args) throws IOException {
5453

5554
@Test public void consumerPriorities() throws Exception {
5655
String queue = channel.queueDeclare().getQueue();
57-
QueueingConsumer highConsumer = new QueueingConsumer(channel);
58-
QueueingConsumer medConsumer = new QueueingConsumer(channel);
59-
QueueingConsumer lowConsumer = new QueueingConsumer(channel);
56+
QueueMessageConsumer highConsumer = new QueueMessageConsumer(channel);
57+
QueueMessageConsumer medConsumer = new QueueMessageConsumer(channel);
58+
QueueMessageConsumer lowConsumer = new QueueMessageConsumer(channel);
6059
String high = channel.basicConsume(queue, true, args(1), highConsumer);
6160
String med = channel.basicConsume(queue, true, medConsumer);
6261
channel.basicConsume(queue, true, args(-1), lowConsumer);
@@ -78,17 +77,41 @@ private Map<String, Object> args(Object o) {
7877
return map;
7978
}
8079

81-
private void assertContents(QueueingConsumer qc, int count, String msg) throws InterruptedException {
80+
private void assertContents(QueueMessageConsumer c, int count, String msg) throws InterruptedException {
8281
for (int i = 0; i < count; i++) {
83-
QueueingConsumer.Delivery d = qc.nextDelivery();
84-
assertEquals(msg, new String(d.getBody()));
82+
byte[] body = c.nextDelivery(100);
83+
assertEquals(msg, new String(body));
8584
}
86-
assertEquals(null, qc.nextDelivery(0));
85+
assertEquals(null, c.nextDelivery());
8786
}
8887

8988
private void publish(String queue, int count, String msg) throws IOException {
9089
for (int i = 0; i < count; i++) {
9190
channel.basicPublish("", queue, MessageProperties.MINIMAL_BASIC, msg.getBytes());
9291
}
9392
}
93+
94+
class QueueMessageConsumer extends DefaultConsumer {
95+
96+
BlockingQueue<byte[]> messages = new LinkedBlockingQueue<byte[]>();
97+
98+
public QueueMessageConsumer(Channel channel) {
99+
super(channel);
100+
}
101+
102+
@Override
103+
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
104+
messages.add(body);
105+
}
106+
107+
byte[] nextDelivery() {
108+
return messages.poll();
109+
}
110+
111+
byte[] nextDelivery(long timeoutInMs) throws InterruptedException {
112+
return messages.poll(timeoutInMs, TimeUnit.MILLISECONDS);
113+
}
114+
115+
}
116+
94117
}

src/test/java/com/rabbitmq/client/test/functional/DeadLetterExchange.java

Lines changed: 31 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,7 @@
2323

2424
import java.io.IOException;
2525
import java.util.*;
26-
import java.util.concurrent.Callable;
27-
import java.util.concurrent.CountDownLatch;
28-
import java.util.concurrent.TimeUnit;
26+
import java.util.concurrent.*;
2927

3028
import static org.junit.Assert.*;
3129

@@ -173,13 +171,13 @@ protected void releaseResources() throws IOException {
173171
channel.queueBind(DLQ, DLX, "test");
174172

175173
//measure round-trip latency
176-
QueueingConsumer c = new QueueingConsumer(channel);
174+
QueueMessageConsumer c = new QueueMessageConsumer(channel);
177175
String cTag = channel.basicConsume(TEST_QUEUE_NAME, true, c);
178176
long start = System.currentTimeMillis();
179177
publish(null, "test");
180-
Delivery d = c.nextDelivery(TTL);
178+
byte[] body = c.nextDelivery(TTL);
181179
long stop = System.currentTimeMillis();
182-
assertNotNull(d);
180+
assertNotNull(body);
183181
channel.basicCancel(cTag);
184182
long latency = stop-start;
185183

@@ -565,14 +563,14 @@ private void sleep(long millis) {
565563

566564
/* check that each message arrives within epsilon of the
567565
publication time + TTL + latency */
568-
private void checkPromptArrival(QueueingConsumer c,
566+
private void checkPromptArrival(QueueMessageConsumer c,
569567
int count, long latency) throws Exception {
570568
long epsilon = TTL / 10;
571569
for (int i = 0; i < count; i++) {
572-
Delivery d = c.nextDelivery(TTL + TTL + latency + epsilon);
573-
assertNotNull("message #" + i + " did not expire", d);
570+
byte[] body = c.nextDelivery(TTL + TTL + latency + epsilon);
571+
assertNotNull("message #" + i + " did not expire", body);
574572
long now = System.currentTimeMillis();
575-
long publishTime = Long.valueOf(new String(d.getBody()));
573+
long publishTime = Long.valueOf(new String(body));
576574
long targetTime = publishTime + TTL + latency;
577575
assertTrue("expiry outside bounds (+/- " + epsilon + "): " +
578576
(now - targetTime),
@@ -699,4 +697,27 @@ public void process(GetResponse getResponse) {
699697
private static String randomQueueName() {
700698
return DeadLetterExchange.class.getSimpleName() + "-" + UUID.randomUUID().toString();
701699
}
700+
701+
class QueueMessageConsumer extends DefaultConsumer {
702+
703+
BlockingQueue<byte[]> messages = new LinkedBlockingQueue<byte[]>();
704+
705+
public QueueMessageConsumer(Channel channel) {
706+
super(channel);
707+
}
708+
709+
@Override
710+
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
711+
messages.add(body);
712+
}
713+
714+
byte[] nextDelivery() {
715+
return messages.poll();
716+
}
717+
718+
byte[] nextDelivery(long timeoutInMs) throws InterruptedException {
719+
return messages.poll(timeoutInMs, TimeUnit.MILLISECONDS);
720+
}
721+
722+
}
702723
}

0 commit comments

Comments
 (0)