11package com .rabbitmq .client .test .functional ;
22
3+ import com .rabbitmq .client .AMQP ;
4+ import com .rabbitmq .client .Channel ;
35import com .rabbitmq .client .GetResponse ;
46import com .rabbitmq .client .QueueingConsumer ;
57import com .rabbitmq .client .QueueingConsumer .Delivery ;
@@ -102,6 +104,30 @@ public void testAutoAckIgnoresPrefetch() throws IOException {
102104 drain (c , 10 );
103105 }
104106
107+ public void testPrefetchZeroMeansInfinity () throws IOException {
108+ QueueingConsumer c = new QueueingConsumer (channel );
109+ publish (q , 10 );
110+ consume (c , 0 , false );
111+ drain (c , 10 );
112+ }
113+
114+ public void testPrefetchValidation () throws IOException {
115+ validationFail (-1 );
116+ validationFail (new HashMap <String , Object >());
117+ validationFail ("banana" );
118+ }
119+
120+ private void validationFail (Object badThing ) throws IOException {
121+ Channel ch = connection .createChannel ();
122+ QueueingConsumer c = new QueueingConsumer (ch );
123+
124+ try {
125+ ch .basicConsume (q , false , args (badThing ), c );
126+ } catch (IOException e ) {
127+ checkShutdownSignal (AMQP .PRECONDITION_FAILED , e );
128+ }
129+ }
130+
105131 private void publish (String q , int n ) throws IOException {
106132 for (int i = 0 ; i < n ; i ++) {
107133 channel .basicPublish ("" , q , null , "" .getBytes ());
@@ -124,11 +150,9 @@ private void nack(Delivery del, boolean multi, boolean requeue) throws IOExcepti
124150 channel .basicNack (del .getEnvelope ().getDeliveryTag (), multi , requeue );
125151 }
126152
127- private Map <String , Object > args (int prefetch ) {
153+ private Map <String , Object > args (Object prefetch ) {
128154 Map <String , Object > a = new HashMap <String , Object >();
129- if (prefetch != 0 ) {
130- a .put ("x-prefetch" , prefetch );
131- }
155+ a .put ("x-prefetch" , prefetch );
132156 return a ;
133157 }
134158}
0 commit comments