Skip to content

Commit 4da2185

Browse files
merge default into bug26008
2 parents c1d76e8 + fb57269 commit 4da2185

File tree

16 files changed

+318
-77
lines changed

16 files changed

+318
-77
lines changed

src/com/rabbitmq/client/Channel.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -215,11 +215,24 @@ public interface Channel extends ShutdownNotifier {
215215
* @param prefetchCount maximum number of messages that the server
216216
* will deliver, 0 if unlimited
217217
* @param global true if the settings should be applied to the
218-
* entire connection rather than just the current channel
218+
* entire channel rather than each consumer
219219
* @throws java.io.IOException if an error is encountered
220220
*/
221221
void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;
222222

223+
/**
224+
* Request a specific prefetchCount "quality of service" settings
225+
* for this channel.
226+
*
227+
* @see #basicQos(int, int, boolean)
228+
* @param prefetchCount maximum number of messages that the server
229+
* will deliver, 0 if unlimited
230+
* @param global true if the settings should be applied to the
231+
* entire channel rather than each consumer
232+
* @throws java.io.IOException if an error is encountered
233+
*/
234+
void basicQos(int prefetchCount, boolean global) throws IOException;
235+
223236
/**
224237
* Request a specific prefetchCount "quality of service" settings
225238
* for this channel.

src/com/rabbitmq/client/Envelope.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,11 @@ public long getDeliveryTag() {
4848
}
4949

5050
/**
51-
* Get the redelivery flag included in this parameter envelope
51+
* Get the redelivery flag included in this parameter envelope. This is a
52+
* hint as to whether this message may have been delivered before (but not
53+
* acknowledged). If the flag is not set, the message definitely has not
54+
* been delivered before. If it is set, it may have been delivered before.
55+
*
5256
* @return the redelivery flag
5357
*/
5458
public boolean isRedeliver() {

src/com/rabbitmq/client/impl/AMQConnection.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,6 @@ public InetAddress getAddress() {
173173
return _frameHandler.getAddress();
174174
}
175175

176-
@Override
177176
public InetAddress getLocalAddress() {
178177
return _frameHandler.getLocalAddress();
179178
}
@@ -183,7 +182,6 @@ public int getPort() {
183182
return _frameHandler.getPort();
184183
}
185184

186-
@Override
187185
public int getLocalPort() {
188186
return _frameHandler.getLocalPort();
189187
}

src/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -599,6 +599,13 @@ public void basicQos(int prefetchSize, int prefetchCount, boolean global)
599599
exnWrappingRpc(new Basic.Qos(prefetchSize, prefetchCount, global));
600600
}
601601

602+
/** Public API - {@inheritDoc} */
603+
public void basicQos(int prefetchCount, boolean global)
604+
throws IOException
605+
{
606+
basicQos(0, prefetchCount, global);
607+
}
608+
602609
/** Public API - {@inheritDoc} */
603610
public void basicQos(int prefetchCount)
604611
throws IOException

src/com/rabbitmq/client/impl/recovery/AutorecoveringChannel.java

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@ public class AutorecoveringChannel implements Channel, Recoverable {
3434
private List<ReturnListener> returnListeners = new ArrayList<ReturnListener>();
3535
private List<ConfirmListener> confirmListeners = new ArrayList<ConfirmListener>();
3636
private List<FlowListener> flowListeners = new ArrayList<FlowListener>();
37-
private int prefetchCount;
38-
private boolean globalQos;
37+
private int prefetchCountConsumer;
38+
private int prefetchCountGlobal;
3939
private boolean usesPublisherConfirms;
4040
private boolean usesTransactions;
4141

@@ -72,7 +72,6 @@ public void close(int closeCode, String closeMessage) throws IOException {
7272
}
7373
}
7474

75-
@Override
7675
public boolean flowBlocked() {
7776
return delegate.flowBlocked();
7877
}
@@ -139,16 +138,23 @@ public void setDefaultConsumer(Consumer consumer) {
139138
}
140139

141140
public void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException {
142-
this.prefetchCount = prefetchCount;
143-
this.globalQos = global;
141+
if (global) {
142+
this.prefetchCountGlobal = prefetchCount;
143+
} else {
144+
this.prefetchCountConsumer = prefetchCount;
145+
}
146+
144147
delegate.basicQos(prefetchSize, prefetchCount, global);
145148
}
146149

147150
public void basicQos(int prefetchCount) throws IOException {
148-
149151
basicQos(0, prefetchCount, false);
150152
}
151153

154+
public void basicQos(int prefetchCount, boolean global) throws IOException {
155+
basicQos(0, prefetchCount, global);
156+
}
157+
152158
public void basicPublish(String exchange, String routingKey, AMQP.BasicProperties props, byte[] body) throws IOException {
153159
delegate.basicPublish(exchange, routingKey, props, body);
154160
}
@@ -433,7 +439,12 @@ private void recoverFlowListeners() {
433439
}
434440

435441
private void recoverState() throws IOException {
436-
basicQos(0, this.prefetchCount, this.globalQos);
442+
if (this.prefetchCountConsumer != 0) {
443+
basicQos(this.prefetchCountConsumer, false);
444+
}
445+
if (this.prefetchCountGlobal != 0) {
446+
basicQos(this.prefetchCountGlobal, true);
447+
}
437448
if(this.usesPublisherConfirms) {
438449
this.confirmSelect();
439450
}

test/src/com/rabbitmq/client/test/AMQConnectionTest.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -207,12 +207,10 @@ public void flush() throws IOException {
207207
// no need to implement this: don't bother writing the frame
208208
}
209209

210-
@Override
211210
public InetAddress getLocalAddress() {
212211
return null;
213212
}
214213

215-
@Override
216214
public int getLocalPort() {
217215
return -1;
218216
}
@@ -259,7 +257,6 @@ public void handleChannelRecoveryException(Channel ch, Throwable ex) {
259257
_handledExceptions.add(ex);
260258
}
261259

262-
@Override
263260
public void handleTopologyRecoveryException(Connection conn, Channel ch, TopologyRecoveryException ex) {
264261
_handledExceptions.add(ex);
265262
}

test/src/com/rabbitmq/client/test/BrokenFramesTest.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -158,12 +158,10 @@ public void flush() throws IOException {
158158
// no need to implement this: don't bother writing the frame
159159
}
160160

161-
@Override
162161
public InetAddress getLocalAddress() {
163162
return null;
164163
}
165164

166-
@Override
167165
public int getLocalPort() {
168166
return -1;
169167
}

test/src/com/rabbitmq/client/test/functional/ConnectionRecovery.java

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,6 @@ public void testConnectionRecoveryWithDisabledTopologyRecovery() throws IOExcept
6969
public void testShutdownHooksRecovery() throws IOException, InterruptedException {
7070
final CountDownLatch latch = new CountDownLatch(2);
7171
connection.addShutdownListener(new ShutdownListener() {
72-
@Override
7372
public void shutdownCompleted(ShutdownSignalException cause) {
7473
latch.countDown();
7574
}
@@ -84,12 +83,10 @@ public void shutdownCompleted(ShutdownSignalException cause) {
8483
public void testBlockedListenerRecovery() throws IOException, InterruptedException {
8584
final CountDownLatch latch = new CountDownLatch(2);
8685
connection.addBlockedListener(new BlockedListener() {
87-
@Override
8886
public void handleBlocked(String reason) throws IOException {
8987
latch.countDown();
9088
}
9189

92-
@Override
9390
public void handleUnblocked() throws IOException {
9491
latch.countDown();
9592
}
@@ -115,7 +112,6 @@ public void testChannelRecovery() throws IOException, InterruptedException {
115112
public void testReturnListenerRecovery() throws IOException, InterruptedException {
116113
final CountDownLatch latch = new CountDownLatch(1);
117114
channel.addReturnListener(new ReturnListener() {
118-
@Override
119115
public void handleReturn(int replyCode, String replyText, String exchange,
120116
String routingKey, AMQP.BasicProperties properties,
121117
byte[] body) throws IOException {
@@ -132,12 +128,10 @@ public void testConfirmListenerRecovery() throws IOException, InterruptedExcepti
132128
int n = 3;
133129
final CountDownLatch latch = new CountDownLatch(n);
134130
channel.addConfirmListener(new ConfirmListener() {
135-
@Override
136131
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
137132
latch.countDown();
138133
}
139134

140-
@Override
141135
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
142136
latch.countDown();
143137
}
@@ -312,7 +306,7 @@ public void handleDelivery(String consumerTag,
312306
if (consumed.intValue() > 0 && consumed.intValue() % 4 == 0) {
313307
CountDownLatch recoveryLatch = prepareForRecovery(connection);
314308
Host.closeConnection((AutorecoveringConnection)connection);
315-
recoveryLatch.await(30, TimeUnit.MINUTES);
309+
ConnectionRecovery.wait(recoveryLatch);
316310
}
317311
channel.basicAck(envelope.getDeliveryTag(), false);
318312
} catch (InterruptedException e) {
@@ -353,7 +347,6 @@ private void expectQueueRecovery(Channel ch, String q) throws IOException, Inter
353347
private CountDownLatch prepareForRecovery(Connection conn) {
354348
final CountDownLatch latch = new CountDownLatch(1);
355349
((AutorecoveringConnection)conn).addRecoveryListener(new RecoveryListener() {
356-
@Override
357350
public void handleRecovery(Recoverable recoverable) {
358351
latch.countDown();
359352
}
@@ -364,7 +357,6 @@ public void handleRecovery(Recoverable recoverable) {
364357
private CountDownLatch prepareForShutdown(Connection conn) throws InterruptedException {
365358
final CountDownLatch latch = new CountDownLatch(1);
366359
conn.addShutdownListener(new ShutdownListener() {
367-
@Override
368360
public void shutdownCompleted(ShutdownSignalException cause) {
369361
latch.countDown();
370362
}
@@ -427,10 +419,10 @@ private ConnectionFactory buildConnectionFactoryWithRecoveryEnabled(boolean disa
427419
return cf;
428420
}
429421

430-
private void wait(CountDownLatch latch) throws InterruptedException {
422+
private static void wait(CountDownLatch latch) throws InterruptedException {
431423
// Very very generous amount of time to wait, just make sure we never
432424
// hang forever
433-
assertTrue(latch.await(30, TimeUnit.MINUTES));
425+
assertTrue(latch.await(1800, TimeUnit.SECONDS));
434426
}
435427

436428
private void waitForConfirms(Channel ch) throws InterruptedException, TimeoutException {

test/src/com/rabbitmq/client/test/functional/FunctionalTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,5 +77,6 @@ public static void add(TestSuite suite) {
7777
suite.addTestSuite(Policies.class);
7878
suite.addTestSuite(ConnectionRecovery.class);
7979
suite.addTestSuite(ExceptionHandling.class);
80+
suite.addTestSuite(PerConsumerPrefetch.class);
8081
}
8182
}
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
package com.rabbitmq.client.test.functional;
2+
3+
import com.rabbitmq.client.GetResponse;
4+
import com.rabbitmq.client.QueueingConsumer;
5+
import com.rabbitmq.client.QueueingConsumer.Delivery;
6+
import com.rabbitmq.client.test.BrokerTestCase;
7+
8+
import java.io.IOException;
9+
import java.util.Arrays;
10+
import java.util.List;
11+
12+
import static com.rabbitmq.client.test.functional.QosTests.drain;
13+
14+
public class PerConsumerPrefetch extends BrokerTestCase {
15+
private String q;
16+
17+
@Override
18+
protected void createResources() throws IOException {
19+
q = channel.queueDeclare().getQueue();
20+
}
21+
22+
private interface Closure {
23+
public void makeMore(List<Delivery> deliveries) throws IOException;
24+
}
25+
26+
public void testSingleAck() throws IOException {
27+
testPrefetch(new Closure() {
28+
public void makeMore(List<Delivery> deliveries) throws IOException {
29+
for (Delivery del : deliveries) {
30+
ack(del, false);
31+
}
32+
}
33+
});
34+
}
35+
36+
public void testMultiAck() throws IOException {
37+
testPrefetch(new Closure() {
38+
public void makeMore(List<Delivery> deliveries) throws IOException {
39+
ack(deliveries.get(deliveries.size() - 1), true);
40+
}
41+
});
42+
}
43+
44+
public void testSingleNack() throws IOException {
45+
for (final boolean requeue: Arrays.asList(false, true)) {
46+
testPrefetch(new Closure() {
47+
public void makeMore(List<Delivery> deliveries) throws IOException {
48+
for (Delivery del : deliveries) {
49+
nack(del, false, requeue);
50+
}
51+
}
52+
});
53+
}
54+
}
55+
56+
public void testMultiNack() throws IOException {
57+
for (final boolean requeue: Arrays.asList(false, true)) {
58+
testPrefetch(new Closure() {
59+
public void makeMore(List<Delivery> deliveries) throws IOException {
60+
nack(deliveries.get(deliveries.size() - 1), true, requeue);
61+
}
62+
});
63+
}
64+
}
65+
66+
public void testRecover() throws IOException {
67+
testPrefetch(new Closure() {
68+
public void makeMore(List<Delivery> deliveries) throws IOException {
69+
channel.basicRecover();
70+
}
71+
});
72+
}
73+
74+
private void testPrefetch(Closure closure) throws IOException {
75+
QueueingConsumer c = new QueueingConsumer(channel);
76+
publish(q, 15);
77+
consume(c, 5, false);
78+
List<Delivery> deliveries = drain(c, 5);
79+
80+
ack(channel.basicGet(q, false), false);
81+
drain(c, 0);
82+
83+
closure.makeMore(deliveries);
84+
drain(c, 5);
85+
}
86+
87+
public void testPrefetchOnEmpty() throws IOException {
88+
QueueingConsumer c = new QueueingConsumer(channel);
89+
publish(q, 5);
90+
consume(c, 10, false);
91+
drain(c, 5);
92+
publish(q, 10);
93+
drain(c, 5);
94+
}
95+
96+
public void testAutoAckIgnoresPrefetch() throws IOException {
97+
QueueingConsumer c = new QueueingConsumer(channel);
98+
publish(q, 10);
99+
consume(c, 1, true);
100+
drain(c, 10);
101+
}
102+
103+
public void testPrefetchZeroMeansInfinity() throws IOException {
104+
QueueingConsumer c = new QueueingConsumer(channel);
105+
publish(q, 10);
106+
consume(c, 0, false);
107+
drain(c, 10);
108+
}
109+
110+
private void publish(String q, int n) throws IOException {
111+
for (int i = 0; i < n; i++) {
112+
channel.basicPublish("", q, null, "".getBytes());
113+
}
114+
}
115+
116+
private void consume(QueueingConsumer c, int prefetch, boolean autoAck) throws IOException {
117+
channel.basicQos(prefetch);
118+
channel.basicConsume(q, autoAck, c);
119+
}
120+
121+
private void ack(Delivery del, boolean multi) throws IOException {
122+
channel.basicAck(del.getEnvelope().getDeliveryTag(), multi);
123+
}
124+
125+
private void ack(GetResponse get, boolean multi) throws IOException {
126+
channel.basicAck(get.getEnvelope().getDeliveryTag(), multi);
127+
}
128+
129+
private void nack(Delivery del, boolean multi, boolean requeue) throws IOException {
130+
channel.basicNack(del.getEnvelope().getDeliveryTag(), multi, requeue);
131+
}
132+
}

0 commit comments

Comments
 (0)