2828import java .util .ArrayList ;
2929import java .util .HashMap ;
3030import java .util .Map ;
31+ import java .util .concurrent .TimeoutException ;
3132
3233public class Routing extends BrokerTestCase
3334{
@@ -232,29 +233,19 @@ public void testHeadersRouting() throws Exception {
232233 checkGet (Q2 , false );
233234 }
234235
235- public void testBasicReturn () throws Exception {
236- channel .addReturnListener (new ReturnListener () {
237- public void handleReturn (int replyCode ,
238- String replyText ,
239- String exchange ,
240- String routingKey ,
241- AMQP .BasicProperties properties ,
242- byte [] body )
243- throws IOException {
244- Routing .this .returnCell .set (replyCode );
245- }
246- });
236+ public void testBasicReturn () throws IOException {
237+ channel .addReturnListener (makeReturnListener ());
247238 returnCell = new BlockingCell <Integer >();
239+
240+ //returned 'mandatory' publish
248241 channel .basicPublish ("" , "unknown" , true , false , null , "mandatory1" .getBytes ());
249- int replyCode = returnCell .uninterruptibleGet ();
250- assertEquals (replyCode , AMQP .NO_ROUTE );
242+ checkReturn (AMQP .NO_ROUTE );
251243
252- returnCell = new BlockingCell < Integer >();
244+ //routed 'mandatory' publish
253245 channel .basicPublish ("" , Q1 , true , false , null , "mandatory2" .getBytes ());
254- GetResponse r = channel .basicGet (Q1 , true );
255- assertNotNull (r );
256- assertEquals (new String (r .getBody ()), "mandatory2" );
246+ assertNotNull (channel .basicGet (Q1 , true ));
257247
248+ //'immediate' publish
258249 channel .basicPublish ("" , Q1 , false , true , null , "immediate" .getBytes ());
259250 try {
260251 channel .basicQos (0 ); //flush
@@ -264,4 +255,51 @@ public void handleReturn(int replyCode,
264255 }
265256 }
266257
258+ public void testBasicReturnTransactional () throws IOException {
259+ channel .txSelect ();
260+ channel .addReturnListener (makeReturnListener ());
261+ returnCell = new BlockingCell <Integer >();
262+
263+ //returned 'mandatory' publish
264+ channel .basicPublish ("" , "unknown" , true , false , null , "mandatory1" .getBytes ());
265+ try {
266+ returnCell .uninterruptibleGet (200 );
267+ fail ("basic.return issued prior to tx.commit" );
268+ } catch (TimeoutException toe ) {}
269+ channel .txCommit ();
270+ checkReturn (AMQP .NO_ROUTE );
271+
272+ //routed 'mandatory' publish
273+ channel .basicPublish ("" , Q1 , true , false , null , "mandatory2" .getBytes ());
274+ channel .txCommit ();
275+ assertNotNull (channel .basicGet (Q1 , true ));
276+
277+ //returned 'mandatory' publish when message is routable on
278+ //publish but not on commit
279+ channel .basicPublish ("" , Q1 , true , false , null , "mandatory2" .getBytes ());
280+ channel .queueDelete (Q1 );
281+ channel .txCommit ();
282+ checkReturn (AMQP .NO_ROUTE );
283+ channel .queueDeclare (Q1 , false , false , false , null );
284+ }
285+
286+ protected ReturnListener makeReturnListener () {
287+ return new ReturnListener () {
288+ public void handleReturn (int replyCode ,
289+ String replyText ,
290+ String exchange ,
291+ String routingKey ,
292+ AMQP .BasicProperties properties ,
293+ byte [] body )
294+ throws IOException {
295+ Routing .this .returnCell .set (replyCode );
296+ }
297+ };
298+ }
299+
300+ protected void checkReturn (int replyCode ) {
301+ assertEquals ((int )returnCell .uninterruptibleGet (), AMQP .NO_ROUTE );
302+ returnCell = new BlockingCell <Integer >();
303+ }
304+
267305}
0 commit comments