Skip to content

Commit 02728be

Browse files
committed
Merged 18557 into default
2 parents 6a71afb + 131bb99 commit 02728be

File tree

7 files changed

+622
-0
lines changed

7 files changed

+622
-0
lines changed

src/com/rabbitmq/client/Channel.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,35 @@ public interface Channel extends ShutdownNotifier{
120120
*/
121121
void setReturnListener(ReturnListener listener);
122122

123+
/**
124+
* Request specific "quality of service" settings.
125+
*
126+
* These settings impose limits on the amount of data the server
127+
* will deliver to consumers before requiring the receipt of
128+
* acknowledgements.
129+
* Thus they provide a means of consumer-initiated flow control.
130+
* @see com.rabbitmq.client.AMQP.Basic.Qos
131+
* @param prefetchSize maximum amount of content (measured in
132+
* octets) that the server will deliver, 0 if unlimited
133+
* @param prefetchCount maximum number of messages that the server
134+
* will deliver, 0 if unlimited
135+
* @param global true if the settings should be applied to the
136+
* entire connection rather than just the current channel
137+
* @throws java.io.IOException if an error is encountered
138+
*/
139+
void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;
140+
141+
/**
142+
* Request a specific prefetchCount "quality of service" settings
143+
* for this channel.
144+
*
145+
* @see #basicQos(int, int, boolean)
146+
* @param prefetchCount maximum number of messages that the server
147+
* will deliver, 0 if unlimited
148+
* @throws java.io.IOException if an error is encountered
149+
*/
150+
void basicQos(int prefetchCount) throws IOException;
151+
123152
/**
124153
* Publish a message with both "mandatory" and "immediate" flags set to false
125154
* @see com.rabbitmq.client.AMQP.Basic.Publish

src/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -360,6 +360,18 @@ public void close(int closeCode,
360360
}
361361
}
362362

363+
public void basicQos(int prefetchSize, int prefetchCount, boolean global)
364+
throws IOException
365+
{
366+
exnWrappingRpc(new Basic.Qos(prefetchSize, prefetchCount, global));
367+
}
368+
369+
public void basicQos(int prefetchCount)
370+
throws IOException
371+
{
372+
basicQos(0, prefetchCount, false);
373+
}
374+
363375
/**
364376
* Public API - Publish a message with both "mandatory" and
365377
* "immediate" flags set to false

test/src/com/rabbitmq/client/test/functional/BrokerTestCase.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,12 @@
3636
import junit.framework.TestCase;
3737

3838
import com.rabbitmq.client.Channel;
39+
import com.rabbitmq.client.Command;
3940
import com.rabbitmq.client.Connection;
4041
import com.rabbitmq.client.ConnectionFactory;
42+
import com.rabbitmq.client.ShutdownSignalException;
43+
44+
import com.rabbitmq.client.AMQP;
4145

4246
public class BrokerTestCase extends TestCase
4347
{
@@ -120,4 +124,17 @@ public void closeChannel()
120124
}
121125
}
122126

127+
public void checkShutdownSignal(int expectedCode, IOException ioe) {
128+
ShutdownSignalException sse = (ShutdownSignalException) ioe.getCause();
129+
Command closeCommand = (Command) sse.getReason();
130+
channel = null;
131+
if (sse.isHardError()) {
132+
connection = null;
133+
AMQP.Connection.Close closeMethod = (AMQP.Connection.Close) closeCommand.getMethod();
134+
assertEquals(expectedCode, closeMethod.getReplyCode());
135+
} else {
136+
AMQP.Channel.Close closeMethod = (AMQP.Channel.Close) closeCommand.getMethod();
137+
assertEquals(expectedCode, closeMethod.getReplyCode());
138+
}
139+
}
123140
}

test/src/com/rabbitmq/client/test/functional/FunctionalTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ public static TestSuite suite() {
4545
suite.addTestSuite(RequeueOnChannelClose.class);
4646
suite.addTestSuite(DurableOnTransient.class);
4747
suite.addTestSuite(NoRequeueOnCancel.class);
48+
suite.addTestSuite(QosTests.class);
4849
return suite;
4950
}
5051
}

0 commit comments

Comments
 (0)