@@ -86,11 +86,9 @@ protected void releaseResources() throws IOException {
8686 }
8787 }
8888
89- private void consumeExactly (QueueingConsumer consumer , int n )
89+ private void consumeNoDuplicates (QueueingConsumer consumer )
9090 throws ShutdownSignalException , InterruptedException {
91- for (; n > 0 ; --n ) {
92- assertNotNull (consumer .nextDelivery (TIMEOUT ));
93- }
91+ assertNotNull (consumer .nextDelivery (TIMEOUT ));
9492 Delivery markerDelivery = consumer .nextDelivery (TIMEOUT );
9593 assertEquals (new String (MARKER ), new String (markerDelivery .getBody ()));
9694 }
@@ -117,18 +115,18 @@ public void testBindingCreationDeletion() throws IOException {
117115 public void testSimpleChains () throws IOException , ShutdownSignalException ,
118116 InterruptedException {
119117 publishWithMarker ("e0" , "" );
120- consumeExactly (consumers [0 ], 1 );
118+ consumeNoDuplicates (consumers [0 ]);
121119
122120 channel .exchangeBind ("e0" , "e1" , "" );
123121 publishWithMarker ("e1" , "" );
124- consumeExactly (consumers [0 ], 1 );
125- consumeExactly (consumers [1 ], 1 );
122+ consumeNoDuplicates (consumers [0 ]);
123+ consumeNoDuplicates (consumers [1 ]);
126124
127125 channel .exchangeBind ("e1" , "e2" , "" );
128126 publishWithMarker ("e2" , "" );
129- consumeExactly (consumers [0 ], 1 );
130- consumeExactly (consumers [1 ], 1 );
131- consumeExactly (consumers [2 ], 1 );
127+ consumeNoDuplicates (consumers [0 ]);
128+ consumeNoDuplicates (consumers [1 ]);
129+ consumeNoDuplicates (consumers [2 ]);
132130
133131 channel .exchangeUnbind ("e0" , "e1" , "" );
134132 channel .exchangeUnbind ("e1" , "e2" , "" );
@@ -145,14 +143,14 @@ public void testDuplicateQueueDestinations() throws IOException,
145143 ShutdownSignalException , InterruptedException {
146144 channel .queueBind ("q1" , "e0" , "" );
147145 publishWithMarker ("e0" , "" );
148- consumeExactly (consumers [0 ], 1 );
149- consumeExactly (consumers [1 ], 1 );
146+ consumeNoDuplicates (consumers [0 ]);
147+ consumeNoDuplicates (consumers [1 ]);
150148
151149 channel .exchangeBind ("e0" , "e1" , "" );
152150
153151 publishWithMarker ("e1" , "" );
154- consumeExactly (consumers [0 ], 1 );
155- consumeExactly (consumers [1 ], 1 );
152+ consumeNoDuplicates (consumers [0 ]);
153+ consumeNoDuplicates (consumers [1 ]);
156154
157155 channel .exchangeUnbind ("e0" , "e1" , "" );
158156 }
@@ -172,12 +170,63 @@ public void testExchangeRoutingLoop() throws IOException,
172170 for (String e : exchanges ) {
173171 publishWithMarker (e , "" );
174172 for (QueueingConsumer c : consumers ) {
175- consumeExactly ( c , 1 );
173+ consumeNoDuplicates ( c );
176174 }
177175 }
178176
179177 channel .exchangeUnbind ("e0" , "e1" , "" );
180178 channel .exchangeUnbind ("e1" , "e2" , "" );
181179 channel .exchangeUnbind ("e2" , "e0" , "" );
182180 }
181+
182+ /* pre (eN --> qN) for N in [0..2]
183+ * create topic e and bind e --> eN with rk eN for N in [0..2]
184+ * test publish with rk to e
185+ * create direct ef and bind e --> ef with rk #
186+ * bind ef --> eN with rk eN for N in [0..2]
187+ * test publish with rk to e
188+ * ( end up with: e -(#)-> ef -(eN)-> eN --> qN;
189+ * e -(eN)-> eN for N in [0..2] )
190+ * Then remove the first set of bindings from e --> eN for N in [0..2]
191+ * test publish with rk to e
192+ */
193+ public void testTopicExchange () throws IOException , ShutdownSignalException , InterruptedException {
194+ channel .exchangeDeclare ("e" , "topic" );
195+ for (String e : exchanges ) {
196+ channel .exchangeBind (e , "e" , e );
197+ }
198+ for (String e : exchanges ) {
199+ publishWithMarker (e , e );
200+ }
201+ for (QueueingConsumer c : consumers ) {
202+ consumeNoDuplicates (c );
203+ }
204+
205+ channel .exchangeDeclare ("ef" , "direct" );
206+ channel .exchangeBind ("ef" , "e" , "#" );
207+
208+ for (String e : exchanges ) {
209+ channel .exchangeBind (e , "ef" , e );
210+ }
211+ for (String e : exchanges ) {
212+ publishWithMarker (e , e );
213+ }
214+ for (QueueingConsumer c : consumers ) {
215+ consumeNoDuplicates (c );
216+ }
217+
218+ channel .exchangeDelete ("ef" );
219+
220+ for (String e : exchanges ) {
221+ channel .exchangeUnbind (e , "e" , e );
222+ }
223+ for (String e : exchanges ) {
224+ publishWithMarker (e , e );
225+ }
226+ for (QueueingConsumer c : consumers ) {
227+ consumeNoDuplicates (c );
228+ }
229+
230+ channel .exchangeDelete ("e" );
231+ }
183232}
0 commit comments