@@ -201,15 +201,15 @@ public static void sleep(int ms) {
201201 }
202202 }
203203
204- public Connection _connection ;
204+ private Connection _connection ;
205205
206- public Channel _ch1 ;
206+ private Channel _ch1 ;
207207
208- public int _messageId = 0 ;
208+ private int _messageId = 0 ;
209209
210- private boolean _silent ;
210+ private final boolean _silent ;
211211
212- private BlockingCell <Object > returnCell ;
212+ private volatile BlockingCell <Object > returnCell ;
213213
214214 public TestMain (Connection connection , boolean silent ) {
215215 _connection = connection ;
@@ -230,15 +230,6 @@ public void run() throws IOException {
230230
231231 _ch1 = createChannel ();
232232
233- _ch1 .addReturnListener (new ReturnListener () {
234- public void handleReturn (int replyCode , String replyText , String exchange , String routingKey , AMQP .BasicProperties properties , byte [] body )
235- throws IOException {
236- Method method = new AMQImpl .Basic .Return (replyCode , replyText , exchange , routingKey );
237- log ("Handling return with body " + new String (body ));
238- returnCell .set (new Object [] { method , properties , body });
239- }
240- });
241-
242233 String queueName =_ch1 .queueDeclare ().getQueue ();
243234
244235 sendLotsOfTrivialMessages (batchSize , queueName );
@@ -276,6 +267,18 @@ public void handleReturn(int replyCode, String replyText, String exchange, Strin
276267 log ("Leaving TestMain.run()." );
277268 }
278269
270+ private void setChannelReturnListener () {
271+ log ("Setting return listener.." );
272+ _ch1 .addReturnListener (new ReturnListener () {
273+ public void handleReturn (int replyCode , String replyText , String exchange , String routingKey , AMQP .BasicProperties properties , byte [] body )
274+ throws IOException {
275+ Method method = new AMQImpl .Basic .Return (replyCode , replyText , exchange , routingKey );
276+ log ("Handling return with body " + new String (body ));
277+ TestMain .this .returnCell .set (new Object [] { method , properties , body });
278+ }
279+ });
280+ }
281+
279282 public class UnexpectedSuccessException extends IOException {
280283 /**
281284 * Default version UID for serializable class
@@ -447,6 +450,8 @@ public void tryBasicReturn() throws IOException {
447450 String mx = "mandatoryTestExchange" ;
448451 _ch1 .exchangeDeclare (mx , "fanout" , false , true , null );
449452
453+ setChannelReturnListener ();
454+
450455 returnCell = new BlockingCell <Object >();
451456 _ch1 .basicPublish (mx , "" , true , false , null , "one" .getBytes ());
452457 doBasicReturn (returnCell , AMQP .NO_ROUTE );
@@ -472,7 +477,15 @@ public void tryBasicReturn() throws IOException {
472477 drain (1 , mq , true );
473478 _ch1 .queueDelete (mq , true , true );
474479
480+ unsetChannelReturnListener ();
481+
475482 log ("Completed basic.return testing." );
483+
484+ }
485+
486+ private void unsetChannelReturnListener () {
487+ _ch1 .clearReturnListeners ();
488+ log ("ReturnListeners unset" );
476489 }
477490
478491 public void waitForKey (String prompt ) throws IOException {
@@ -487,8 +500,12 @@ public void waitForKey(String prompt) throws IOException {
487500
488501 public void tryTransaction (String queueName ) throws IOException {
489502
503+ log ("About to tryTranscation" );
504+
490505 _ch1 .txSelect ();
491506
507+ setChannelReturnListener ();
508+
492509 //test basicReturn handling in tx context
493510 returnCell = new BlockingCell <Object >();
494511 _ch1 .basicPublish ("" , queueName , false , false , null , "normal" .getBytes ());
@@ -502,6 +519,8 @@ public void tryTransaction(String queueName) throws IOException {
502519 _ch1 .txCommit ();
503520 expect (2 , drain (10 , queueName , false ));
504521
522+ unsetChannelReturnListener ();
523+ log ("Finished tryTransaction" );
505524 }
506525
507526
0 commit comments