11package com .rabbitmq .client .test .functional ;
22
3+ import com .rabbitmq .client .AMQP ;
4+ import com .rabbitmq .client .Channel ;
35import com .rabbitmq .client .GetResponse ;
4- import com .rabbitmq .client .MessageProperties ;
56import com .rabbitmq .client .test .BrokerTestCase ;
67import com .rabbitmq .tools .Host ;
78
1617 */
1718public class Policies extends BrokerTestCase {
1819 @ Override protected void createResources () throws IOException {
19- setAE ();
20- setDLX ();
20+ setPolicy ("AE" , "^has-ae" , "{\\ \" alternate-exchange\\ \" :\\ \" ae\\ \" }" );
21+ setPolicy ("DLX" , "^has-dlx" , "{\\ \" dead-letter-exchange\\ \" :\\ \" dlx\\ \" \\ ,\\ \" dead-letter-routing-key\\ \" :\\ \" rk\\ \" }" );
22+ setPolicy ("TTL" , "^has-ttl" , "{\\ \" message-ttl\\ \" :10}" );
23+ setPolicy ("Expires" , "^has-expires" , "{\\ \" expires\\ \" :10}" );
24+ setPolicy ("MaxLength" , "^has-max-length" , "{\\ \" max-length\\ \" :1}" );
2125 channel .exchangeDeclare ("has-ae" , "fanout" );
2226 Map <String , Object > args = new HashMap <String , Object >();
2327 args .put ("alternate-exchange" , "ae2" );
2428 channel .exchangeDeclare ("has-ae-args" , "fanout" , false , false , args );
2529 }
2630
2731 public void testAlternateExchange () throws IOException {
28- String q = channel . queueDeclare (). getQueue ();
32+ String q = declareQueue ();
2933 channel .exchangeDeclare ("ae" , "fanout" , false , true , null );
3034 channel .queueBind (q , "ae" , "" );
31- channel . basicPublish ("has-ae" , "" , MessageProperties . BASIC , "" . getBytes () );
35+ basicPublishVolatile ("has-ae" , "" );
3236 assertDelivered (q , 1 );
3337 clearPolicies ();
34- channel .basicPublish ("has-ae" , "" , MessageProperties .BASIC , "" .getBytes ());
38+
39+ basicPublishVolatile ("has-ae" , "" );
3540 assertDelivered (q , 0 );
3641 }
3742
3843 // i.e. the argument takes priority over the policy
3944 public void testAlternateExchangeArgs () throws IOException {
40- String q = channel . queueDeclare (). getQueue ();
45+ String q = declareQueue ();
4146 channel .exchangeDeclare ("ae2" , "fanout" , false , true , null );
4247 channel .queueBind (q , "ae2" , "" );
43- channel . basicPublish ("has-ae-args" , "" , MessageProperties . BASIC , "" . getBytes () );
48+ basicPublishVolatile ("has-ae-args" , "" );
4449 assertDelivered (q , 1 );
4550 }
4651
4752 public void testDeadLetterExchange () throws IOException , InterruptedException {
48- Map <String , Object > args = new HashMap <String , Object >();
49- args .put ("x-message-ttl" , 0 );
50- String src = channel .queueDeclare ("has-dlx" , false , true , false , args ).getQueue ();
51- String dest = channel .queueDeclare ().getQueue ();
53+ Map <String , Object > args = ttlArgs (0 );
54+ String src = declareQueue ("has-dlx" , args );
55+ String dest = declareQueue ();
5256 channel .exchangeDeclare ("dlx" , "fanout" , false , true , null );
5357 channel .queueBind (dest , "dlx" , "" );
54- channel . basicPublish ( "" , src , MessageProperties . BASIC , "" . getBytes () );
58+ basicPublishVolatile ( src );
5559 Thread .sleep (10 );
5660 GetResponse resp = channel .basicGet (dest , true );
5761 assertEquals ("rk" , resp .getEnvelope ().getRoutingKey ());
5862 clearPolicies ();
59- channel .basicPublish ("" , src , MessageProperties .BASIC , "" .getBytes ());
63+
64+ basicPublishVolatile (src );
6065 Thread .sleep (10 );
6166 assertDelivered (dest , 0 );
6267 }
6368
6469 // again the argument takes priority over the policy
6570 public void testDeadLetterExchangeArgs () throws IOException , InterruptedException {
66- Map <String , Object > args = new HashMap <String , Object >();
67- args .put ("x-message-ttl" , 0 );
71+ Map <String , Object > args = ttlArgs (0 );
6872 args .put ("x-dead-letter-exchange" , "dlx2" );
6973 args .put ("x-dead-letter-routing-key" , "rk2" );
70- String src = channel . queueDeclare ("has-dlx-args" , false , true , false , args ). getQueue ( );
71- String dest = channel . queueDeclare (). getQueue ();
74+ String src = declareQueue ("has-dlx-args" , args );
75+ String dest = declareQueue ();
7276 channel .exchangeDeclare ("dlx2" , "fanout" , false , true , null );
7377 channel .queueBind (dest , "dlx2" , "" );
74- channel . basicPublish ( "" , src , MessageProperties . BASIC , "" . getBytes () );
78+ basicPublishVolatile ( src );
7579 Thread .sleep (10 );
7680 GetResponse resp = channel .basicGet (dest , true );
7781 assertEquals ("rk2" , resp .getEnvelope ().getRoutingKey ());
7882 }
7983
80- @ Override protected void releaseResources () throws IOException {
84+ public void testTTL () throws IOException , InterruptedException {
85+ String q = declareQueue ("has-ttl" , null );
86+ basicPublishVolatile (q );
87+ Thread .sleep (20 );
88+ assertDelivered (q , 0 );
8189 clearPolicies ();
82- channel .exchangeDelete ("has-ae" );
83- channel .exchangeDelete ("has-ae-args" );
90+
91+ basicPublishVolatile (q );
92+ Thread .sleep (20 );
93+ assertDelivered (q , 1 );
8494 }
8595
86- private Set <String > policies = new HashSet <String >();
96+ // Test that we get lower of args and policy
97+ public void testTTLArgs () throws IOException , InterruptedException {
98+ String q = declareQueue ("has-ttl" , ttlArgs (30 ));
99+ basicPublishVolatile (q );
100+ Thread .sleep (20 );
101+ assertDelivered (q , 0 );
102+ clearPolicies ();
87103
88- private void setAE () throws IOException {
89- setPolicy ("AE" , "^has-ae" , "{\\ \" alternate-exchange\\ \" :\\ \" ae\\ \" }" );
104+ basicPublishVolatile (q );
105+ Thread .sleep (20 );
106+ assertDelivered (q , 1 );
107+ basicPublishVolatile (q );
108+ Thread .sleep (40 );
109+ assertDelivered (q , 0 );
90110 }
91111
92- private void setDLX () throws IOException {
93- setPolicy ("DLX" , "^has-dlx" , "{\\ \" dead-letter-exchange\\ \" :\\ \" dlx\\ \" \\ ,\\ \" dead-letter-routing-key\\ \" :\\ \" rk\\ \" }" );
112+ public void testExpires () throws IOException , InterruptedException {
113+ String q = declareQueue ("has-expires" , null );
114+ Thread .sleep (20 );
115+ assertFalse (queueExists (q ));
116+ clearPolicies ();
117+
118+ q = declareQueue ("has-expires" , null );
119+ Thread .sleep (20 );
120+ assertTrue (queueExists (q ));
121+ }
122+
123+ // Test that we get lower of args and policy
124+ public void testExpiresArgs () throws IOException , InterruptedException {
125+ String q = declareQueue ("has-expires" , args ("x-expires" , 30 ));
126+ Thread .sleep (20 );
127+ assertFalse (queueExists (q ));
128+ clearPolicies ();
129+
130+ q = declareQueue ("has-expires" , args ("x-expires" , 30 ));
131+ Thread .sleep (20 );
132+ assertTrue (queueExists (q ));
133+ }
134+
135+ public void testMaxLength () throws IOException , InterruptedException {
136+ String q = declareQueue ("has-max-length" , null );
137+ basicPublishVolatileN (q , 3 );
138+ assertDelivered (q , 1 );
139+ clearPolicies ();
140+
141+ basicPublishVolatileN (q , 3 );
142+ assertDelivered (q , 3 );
94143 }
95144
145+ public void testMaxLengthArgs () throws IOException , InterruptedException {
146+ String q = declareQueue ("has-max-length" , args ("x-max-length" , 2 ));
147+ basicPublishVolatileN (q , 3 );
148+ assertDelivered (q , 1 );
149+ clearPolicies ();
150+
151+ basicPublishVolatileN (q , 3 );
152+ assertDelivered (q , 2 );
153+ }
154+
155+ @ Override protected void releaseResources () throws IOException {
156+ clearPolicies ();
157+ channel .exchangeDelete ("has-ae" );
158+ channel .exchangeDelete ("has-ae-args" );
159+ }
160+
161+ private Set <String > policies = new HashSet <String >();
162+
96163 private void setPolicy (String name , String pattern , String definition ) throws IOException {
97164 Host .rabbitmqctl ("set_policy " + name + " " + pattern + " " + definition );
98165 policies .add (name );
@@ -104,4 +171,39 @@ private void clearPolicies() throws IOException {
104171 }
105172 policies .clear ();
106173 }
107- }
174+
175+ private Map <String , Object > ttlArgs (int ttl ) {
176+ return args ("x-message-ttl" , ttl );
177+ }
178+
179+ private Map <String , Object > args (String name , Object value ) {
180+ Map <String , Object > args = new HashMap <String , Object >();
181+ args .put (name , value );
182+ return args ;
183+ }
184+
185+ private String declareQueue () throws IOException {
186+ return channel .queueDeclare ().getQueue ();
187+ }
188+
189+ private String declareQueue (String name , Map <String , Object > args ) throws IOException {
190+ return channel .queueDeclare (name , false , true , false , args ).getQueue ();
191+ }
192+
193+ private boolean queueExists (String name ) throws IOException {
194+ Channel ch2 = connection .createChannel ();
195+ try {
196+ ch2 .queueDeclarePassive (name );
197+ return true ;
198+ } catch (IOException ioe ) {
199+ return false ;
200+ }
201+ }
202+
203+ private void basicPublishVolatileN (String q , int count ) throws IOException {
204+ for (int i = 0 ; i < count ; i ++) {
205+ basicPublishVolatile (q );
206+ }
207+ }
208+
209+ }
0 commit comments