|
| 1 | +// The contents of this file are subject to the Mozilla Public License |
| 2 | +// Version 1.1 (the "License"); you may not use this file except in |
| 3 | +// compliance with the License. You may obtain a copy of the License |
| 4 | +// at http://www.mozilla.org/MPL/ |
| 5 | +// |
| 6 | +// Software distributed under the License is distributed on an "AS IS" |
| 7 | +// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See |
| 8 | +// the License for the specific language governing rights and |
| 9 | +// limitations under the License. |
| 10 | +// |
| 11 | +// The Original Code is RabbitMQ. |
| 12 | +// |
| 13 | +// The Initial Developer of the Original Code is VMware, Inc. |
| 14 | +// Copyright (c) 2007-2012 VMware, Inc. All rights reserved. |
| 15 | +// |
| 16 | + |
| 17 | + |
1 | 18 | package com.rabbitmq.client.test.functional; |
2 | 19 |
|
3 | 20 | import com.rabbitmq.client.*; |
|
6 | 23 |
|
7 | 24 | public class PerMessageTTL extends TTLHandling { |
8 | 25 |
|
9 | | - @Override |
10 | | - protected void createResources() throws IOException { |
11 | | - super.createResources(); |
12 | | - declareAndBindQueue(); |
13 | | - this.channel.confirmSelect(); |
14 | | - } |
| 26 | + private Object sessionTTL; |
15 | 27 |
|
16 | 28 | @Override |
17 | | - protected void releaseResources() throws IOException { |
18 | | - super.releaseResources(); |
19 | | - } |
20 | | - |
21 | | - public void testSupportedTTLTypes() throws IOException { |
22 | | - Object[] args = { (((byte)200) & (0xff)), (short)200, 200, 200L }; |
23 | | - for (Object ttl : args) { |
24 | | - try { |
25 | | - publishAndSynchronise(MSG[0], ttl); |
26 | | - } catch(Exception ex) { |
27 | | - fail("Should be able to use " + ttl.getClass().getName() + |
28 | | - " for basic.expiration: " + ex.getMessage()); |
29 | | - } |
30 | | - } |
31 | | - } |
32 | | - |
33 | | - public void testTTLAllowZero() throws Exception { |
34 | | - try { |
35 | | - publish(MSG[0], (byte) 0); |
36 | | - this.channel.waitForConfirmsOrDie(); |
37 | | - } catch (Exception e) { |
38 | | - fail("Should be able to publish with basic.expiration set to zero: " + |
39 | | - e.getMessage()); |
40 | | - } |
41 | | - } |
42 | | - |
43 | | - public void testPublishWithInvalidTTL() throws InterruptedException { |
| 29 | + public void testInvalidTypeUsedInTTL() throws Exception { |
44 | 30 | try { |
45 | | - publishAndSynchronise(MSG[0], "foobar"); |
46 | | - fail("Should not be able to set a non-long value for basic.expiration"); |
| 31 | + super.testInvalidTypeUsedInTTL(); |
47 | 32 | } catch (IOException e) { |
48 | 33 | checkShutdownSignal(AMQP.INTERNAL_ERROR, e); |
49 | 34 | } |
50 | 35 | } |
51 | 36 |
|
| 37 | + @Override |
52 | 38 | public void testTTLMustBePositive() throws Exception { |
53 | 39 | try { |
54 | | - publishAndSynchronise(MSG[0], -15); |
55 | | - fail("Should not be able to set a negative value for basic.expiration"); |
| 40 | + super.testTTLMustBePositive(); |
56 | 41 | } catch (IOException e) { |
57 | 42 | checkShutdownSignal(AMQP.INTERNAL_ERROR, e); |
58 | 43 | } |
59 | 44 | } |
60 | 45 |
|
61 | | - public void testMessagesExpireWhenUsingBasicGet() throws Exception { |
62 | | - publish(MSG[0], 200); |
63 | | - Thread.sleep(1000); |
64 | | - |
65 | | - String what = get(); |
66 | | - assertNull("expected message " + what + " to have been removed", what); |
67 | | - } |
68 | | - |
69 | | - public void testMultiplePublishAndGetWithExpiry() throws Exception { |
70 | | - // this seems quite timing dependent - would it not be better |
71 | | - // to test this by setting up a DLX and verifying that the |
72 | | - // expired messages have been sent there instead? |
73 | | - publish(MSG[0], 200); |
74 | | - Thread.sleep(500); |
75 | | - |
76 | | - publish(MSG[1], 200); |
77 | | - Thread.sleep(100); |
78 | | - |
79 | | - publish(MSG[2], 200); |
80 | | - |
81 | | - assertEquals(MSG[1], get()); |
82 | | - assertEquals(MSG[2], get()); |
83 | | - assertNull(get()); |
84 | | - } |
85 | | - |
86 | | - /* |
87 | | - * Test get expiry for messages sent under a transaction |
88 | | - */ |
89 | | - public void testTransactionalPublishWithGet() throws Exception { |
90 | | - closeChannel(); |
91 | | - openChannel(); |
92 | | - this.channel.txSelect(); |
93 | | - |
94 | | - publish(MSG[0], 100); |
95 | | - Thread.sleep(150); |
96 | | - |
97 | | - publish(MSG[1], 100); |
98 | | - this.channel.txCommit(); |
99 | | - Thread.sleep(50); |
100 | | - |
101 | | - assertEquals(MSG[0], get()); |
102 | | - Thread.sleep(80); |
103 | | - |
104 | | - assertNull(get()); |
105 | | - } |
106 | | - |
107 | | - /* |
108 | | - * Test expiry of requeued messages |
109 | | - */ |
110 | | - public void testExpiryWithReQueue() throws Exception { |
111 | | - publish(MSG[0], 100); |
112 | | - Thread.sleep(50); |
113 | | - |
114 | | - publish(MSG[1], 100); |
115 | | - publish(MSG[2], 100); |
116 | | - |
117 | | - expectBodyAndRemainingMessages(MSG[0], 2); |
118 | | - expectBodyAndRemainingMessages(MSG[1], 1); |
119 | | - |
120 | | - closeChannel(); |
121 | | - openChannel(); |
122 | | - |
123 | | - Thread.sleep(60); |
124 | | - expectBodyAndRemainingMessages(MSG[1], 1); |
125 | | - expectBodyAndRemainingMessages(MSG[2], 0); |
126 | | - } |
127 | | - |
128 | | - /* |
129 | | - * Test expiry of requeued messages after being consumed instantly |
130 | | - */ |
131 | | - public void testExpiryWithReQueueAfterConsume() throws Exception { |
132 | | - QueueingConsumer c = new QueueingConsumer(channel); |
133 | | - channel.basicConsume(TTL_QUEUE_NAME, c); |
134 | | - |
135 | | - publish(MSG[0], 100); |
136 | | - assertNotNull(c.nextDelivery(100)); |
137 | | - |
138 | | - closeChannel(); |
139 | | - Thread.sleep(150); |
140 | | - openChannel(); |
141 | | - |
142 | | - assertNull("Re-queued message not expired", get()); |
143 | | - } |
144 | | - |
145 | | - public void testZeroTTLDelivery() throws Exception { |
146 | | - // when there is no consumer, message should expire |
147 | | - publish(MSG[0], 0); |
148 | | - assertNull(get()); |
149 | | - |
150 | | - // when there is a consumer, message should be delivered |
151 | | - QueueingConsumer c = new QueueingConsumer(channel); |
152 | | - channel.basicConsume(TTL_QUEUE_NAME, c); |
153 | | - publish(MSG[0], 0); |
154 | | - QueueingConsumer.Delivery d = c.nextDelivery(100); |
155 | | - assertNotNull(d); |
156 | | - |
157 | | - // requeued messages should expire |
158 | | - channel.basicReject(d.getEnvelope().getDeliveryTag(), true); |
159 | | - assertNull(c.nextDelivery(100)); |
160 | | - } |
161 | | - |
162 | | - private void publishAndSynchronise(String msg, Object expiration) |
163 | | - throws IOException, InterruptedException { |
164 | | - publish(msg, expiration); |
165 | | - this.channel.basicQos(0); |
166 | | - } |
167 | | - |
168 | | - private void publish(String msg, Object exp) throws IOException { |
| 46 | + @Override |
| 47 | + protected void publish(String msg) throws IOException { |
169 | 48 | basicPublishVolatile(msg.getBytes(), TTL_EXCHANGE, TTL_QUEUE_NAME, |
170 | 49 | MessageProperties.TEXT_PLAIN |
171 | 50 | .builder() |
172 | | - .expiration(String.valueOf(exp)) |
| 51 | + .expiration(String.valueOf(sessionTTL)) |
173 | 52 | .build()); |
174 | 53 | } |
175 | 54 |
|
176 | | - private void declareAndBindQueue() throws IOException { |
177 | | - declareQueue(TTL_QUEUE_NAME); |
178 | | - this.channel.queueBind(TTL_QUEUE_NAME, TTL_EXCHANGE, TTL_QUEUE_NAME); |
179 | | - } |
180 | | - |
181 | | - private AMQP.Queue.DeclareOk declareQueue(String name) throws IOException { |
| 55 | + @Override |
| 56 | + protected AMQP.Queue.DeclareOk declareQueue(String name, Object ttlValue) throws IOException { |
| 57 | + this.sessionTTL = ttlValue; |
182 | 58 | return this.channel.queueDeclare(name, false, true, false, null); |
183 | 59 | } |
184 | | - |
185 | 60 | } |
0 commit comments