Skip to content

Commit 247be6f

Browse files
author
Simon MacMullen
committed
Add a shorter form of Channel.basicConsume() which takes args, and use it in a few places where it makes sense.
1 parent 62d767f commit 247be6f

File tree

5 files changed

+34
-9
lines changed

5 files changed

+34
-9
lines changed

src/com/rabbitmq/client/Channel.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -622,6 +622,23 @@ void basicNack(long deliveryTag, boolean multiple, boolean requeue)
622622
*/
623623
String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
624624

625+
/**
626+
* Start a non-nolocal, non-exclusive consumer, with
627+
* a server-generated consumerTag and specified arguments.
628+
* @param queue the name of the queue
629+
* @param autoAck true if the server should consider messages
630+
* acknowledged once delivered; false if the server should expect
631+
* explicit acknowledgements
632+
* @param arguments a set of arguments for the consume
633+
* @param callback an interface to the consumer object
634+
* @return the consumerTag generated by the server
635+
* @throws java.io.IOException if an error is encountered
636+
* @see com.rabbitmq.client.AMQP.Basic.Consume
637+
* @see com.rabbitmq.client.AMQP.Basic.ConsumeOk
638+
* @see #basicConsume(String, boolean, String, boolean, boolean, Map, Consumer)
639+
*/
640+
String basicConsume(String queue, boolean autoAck, Map<String, Object> arguments, Consumer callback) throws IOException;
641+
625642
/**
626643
* Start a non-nolocal, non-exclusive consumer.
627644
* @param queue the name of the queue

src/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -935,6 +935,14 @@ public String basicConsume(String queue, boolean autoAck, Consumer callback)
935935
return basicConsume(queue, autoAck, "", callback);
936936
}
937937

938+
/** Public API - {@inheritDoc} */
939+
public String basicConsume(String queue, boolean autoAck, Map<String, Object> arguments,
940+
Consumer callback)
941+
throws IOException
942+
{
943+
return basicConsume(queue, autoAck, "", false, false, arguments, callback);
944+
}
945+
938946
/** Public API - {@inheritDoc} */
939947
public String basicConsume(String queue, boolean autoAck, String consumerTag,
940948
Consumer callback)

test/src/com/rabbitmq/client/test/functional/ConsumerPriorities.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ private void assertFailValidation(Map<String, Object> args) throws IOException {
2323
Channel ch = connection.createChannel();
2424
String queue = ch.queueDeclare().getQueue();
2525
try {
26-
ch.basicConsume(queue, true, "", false, false, args, new QueueingConsumer(ch));
26+
ch.basicConsume(queue, true, args, new QueueingConsumer(ch));
2727
fail("Validation should fail for " + args);
2828
} catch (IOException ioe) {
2929
checkShutdownSignal(AMQP.PRECONDITION_FAILED, ioe);
@@ -37,14 +37,14 @@ public void testConsumerPriorities() throws Exception {
3737
QueueingConsumer highConsumer = new QueueingConsumer(channel);
3838
QueueingConsumer medConsumer = new QueueingConsumer(channel);
3939
QueueingConsumer lowConsumer = new QueueingConsumer(channel);
40-
channel.basicConsume(queue, true, "high", false, false, args(1), highConsumer);
41-
channel.basicConsume(queue, true, "med", false, false, null, medConsumer);
42-
channel.basicConsume(queue, true, "low", false, false, args(-1), lowConsumer);
40+
String high = channel.basicConsume(queue, true, args(1), highConsumer);
41+
String med = channel.basicConsume(queue, true, medConsumer);
42+
channel.basicConsume(queue, true, args(-1), lowConsumer);
4343

4444
publish(queue, COUNT, "high");
45-
channel.basicCancel("high");
45+
channel.basicCancel(high);
4646
publish(queue, COUNT, "med");
47-
channel.basicCancel("med");
47+
channel.basicCancel(med);
4848
publish(queue, COUNT, "low");
4949

5050
assertContents(highConsumer, COUNT, "high");

test/src/com/rabbitmq/client/test/functional/PerConsumerPrefetch.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ private void publish(String q, int n) throws IOException {
109109
}
110110

111111
private void consume(QueueingConsumer c, int prefetch, boolean autoAck) throws IOException {
112-
channel.basicConsume(q, autoAck, "", false, false, args(prefetch), c);
112+
channel.basicConsume(q, autoAck, args(prefetch), c);
113113
}
114114

115115
private void ack(Delivery del, boolean multi) throws IOException {

test/src/com/rabbitmq/examples/perf/Consumer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ public void run() {
6666

6767
try {
6868
q = new QueueingConsumer(channel);
69-
channel.basicConsume(queueName, autoAck, "", false, false, args(prefetch), q);
69+
channel.basicConsume(queueName, autoAck, args(prefetch), q);
7070

7171
while ((timeLimit == 0 || now < startTime + timeLimit) &&
7272
(msgLimit == 0 || totalMsgCount < msgLimit)) {
@@ -81,7 +81,7 @@ public void run() {
8181
} catch (ConsumerCancelledException e) {
8282
System.out.println("Consumer cancelled by broker. Re-consuming.");
8383
q = new QueueingConsumer(channel);
84-
channel.basicConsume(queueName, autoAck, "", false, false, args(prefetch), q);
84+
channel.basicConsume(queueName, autoAck, args(prefetch), q);
8585
continue;
8686
}
8787
totalMsgCount++;

0 commit comments

Comments
 (0)