Skip to content

Commit 8c49f0e

Browse files
author
Simon MacMullen
committed
Remove the extra convenience method for basic.consume which is no longer especially convenient, and fix the interaction of recovering channels with new basic.qos.
1 parent 3891bfb commit 8c49f0e

File tree

4 files changed

+24
-37
lines changed

4 files changed

+24
-37
lines changed

src/com/rabbitmq/client/Channel.java

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -628,23 +628,6 @@ void basicNack(long deliveryTag, boolean multiple, boolean requeue)
628628
*/
629629
String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
630630

631-
/**
632-
* Start a non-nolocal, non-exclusive consumer, with
633-
* a server-generated consumerTag and specified arguments.
634-
* @param queue the name of the queue
635-
* @param autoAck true if the server should consider messages
636-
* acknowledged once delivered; false if the server should expect
637-
* explicit acknowledgements
638-
* @param arguments a set of arguments for the consume
639-
* @param callback an interface to the consumer object
640-
* @return the consumerTag generated by the server
641-
* @throws java.io.IOException if an error is encountered
642-
* @see com.rabbitmq.client.AMQP.Basic.Consume
643-
* @see com.rabbitmq.client.AMQP.Basic.ConsumeOk
644-
* @see #basicConsume(String, boolean, String, boolean, boolean, Map, Consumer)
645-
*/
646-
String basicConsume(String queue, boolean autoAck, Map<String, Object> arguments, Consumer callback) throws IOException;
647-
648631
/**
649632
* Start a non-nolocal, non-exclusive consumer.
650633
* @param queue the name of the queue

src/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -946,14 +946,6 @@ public String basicConsume(String queue, boolean autoAck, Consumer callback)
946946
return basicConsume(queue, autoAck, "", callback);
947947
}
948948

949-
/** Public API - {@inheritDoc} */
950-
public String basicConsume(String queue, boolean autoAck, Map<String, Object> arguments,
951-
Consumer callback)
952-
throws IOException
953-
{
954-
return basicConsume(queue, autoAck, "", false, false, arguments, callback);
955-
}
956-
957949
/** Public API - {@inheritDoc} */
958950
public String basicConsume(String queue, boolean autoAck, String consumerTag,
959951
Consumer callback)

src/com/rabbitmq/client/impl/recovery/AutorecoveringChannel.java

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@ public class AutorecoveringChannel implements Channel, Recoverable {
3434
private List<ReturnListener> returnListeners = new ArrayList<ReturnListener>();
3535
private List<ConfirmListener> confirmListeners = new ArrayList<ConfirmListener>();
3636
private List<FlowListener> flowListeners = new ArrayList<FlowListener>();
37-
private int prefetchCount;
38-
private boolean globalQos;
37+
private int prefetchCountConsumer;
38+
private int prefetchCountGlobal;
3939
private boolean usesPublisherConfirms;
4040
private boolean usesTransactions;
4141

@@ -138,16 +138,23 @@ public void setDefaultConsumer(Consumer consumer) {
138138
}
139139

140140
public void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException {
141-
this.prefetchCount = prefetchCount;
142-
this.globalQos = global;
141+
if (global) {
142+
this.prefetchCountGlobal = prefetchCount;
143+
} else {
144+
this.prefetchCountConsumer = prefetchCount;
145+
}
146+
143147
delegate.basicQos(prefetchSize, prefetchCount, global);
144148
}
145149

146150
public void basicQos(int prefetchCount) throws IOException {
147-
148151
basicQos(0, prefetchCount, false);
149152
}
150153

154+
public void basicQos(int prefetchCount, boolean global) throws IOException {
155+
basicQos(0, prefetchCount, global);
156+
}
157+
151158
public void basicPublish(String exchange, String routingKey, AMQP.BasicProperties props, byte[] body) throws IOException {
152159
delegate.basicPublish(exchange, routingKey, props, body);
153160
}
@@ -432,7 +439,12 @@ private void recoverFlowListeners() {
432439
}
433440

434441
private void recoverState() throws IOException {
435-
basicQos(0, this.prefetchCount, this.globalQos);
442+
if (this.prefetchCountConsumer != 0) {
443+
basicQos(this.prefetchCountConsumer, false);
444+
}
445+
if (this.prefetchCountGlobal != 0) {
446+
basicQos(this.prefetchCountGlobal, true);
447+
}
436448
if(this.usesPublisherConfirms) {
437449
this.confirmSelect();
438450
}

test/src/com/rabbitmq/client/test/functional/ConsumerPriorities.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ private void assertFailValidation(Map<String, Object> args) throws IOException {
2323
Channel ch = connection.createChannel();
2424
String queue = ch.queueDeclare().getQueue();
2525
try {
26-
ch.basicConsume(queue, true, args, new QueueingConsumer(ch));
26+
ch.basicConsume(queue, true, "", false, false, args, new QueueingConsumer(ch));
2727
fail("Validation should fail for " + args);
2828
} catch (IOException ioe) {
2929
checkShutdownSignal(AMQP.PRECONDITION_FAILED, ioe);
@@ -37,14 +37,14 @@ public void testConsumerPriorities() throws Exception {
3737
QueueingConsumer highConsumer = new QueueingConsumer(channel);
3838
QueueingConsumer medConsumer = new QueueingConsumer(channel);
3939
QueueingConsumer lowConsumer = new QueueingConsumer(channel);
40-
String high = channel.basicConsume(queue, true, args(1), highConsumer);
41-
String med = channel.basicConsume(queue, true, medConsumer);
42-
channel.basicConsume(queue, true, args(-1), lowConsumer);
40+
channel.basicConsume(queue, true, "high", false, false, args(1), highConsumer);
41+
channel.basicConsume(queue, true, "med", false, false, null, medConsumer);
42+
channel.basicConsume(queue, true, "low", false, false, args(-1), lowConsumer);
4343

4444
publish(queue, COUNT, "high");
45-
channel.basicCancel(high);
45+
channel.basicCancel("high");
4646
publish(queue, COUNT, "med");
47-
channel.basicCancel(med);
47+
channel.basicCancel("med");
4848
publish(queue, COUNT, "low");
4949

5050
assertContents(highConsumer, COUNT, "high");

0 commit comments

Comments
 (0)