2727import java .io .IOException ;
2828import java .util .concurrent .ArrayBlockingQueue ;
2929import java .util .concurrent .BlockingQueue ;
30+ import java .util .concurrent .CountDownLatch ;
3031
3132public class ConsumerCancelNotification extends BrokerTestCase {
3233
@@ -86,15 +87,17 @@ public void run() {
8687
8788 class AlteringConsumer extends DefaultConsumer {
8889 private final String altQueue ;
90+ private final CountDownLatch latch ;
8991
9092 /**
9193 * Constructs a new instance and records its association to the passed-in channel.
9294 *
9395 * @param channel the channel to which this consumer is attached
9496 */
95- public AlteringConsumer (Channel channel , String altQueue ) {
97+ public AlteringConsumer (Channel channel , String altQueue , CountDownLatch latch ) {
9698 super (channel );
9799 this .altQueue = altQueue ;
100+ this .latch = latch ;
98101 }
99102
100103 @ Override
@@ -107,6 +110,7 @@ public void handleShutdownSignal(String consumerTag,
107110 public void handleCancel (String consumerTag ) {
108111 try {
109112 this .getChannel ().queueDeclare (this .altQueue , false , true , false , null );
113+ latch .countDown ();
110114 } catch (IOException e ) {
111115 // e.printStackTrace();
112116 }
@@ -118,12 +122,14 @@ public void testConsumerCancellationHandlerUsesBlockingOperations()
118122 final String altQueue = "basic.cancel.fallback" ;
119123 channel .queueDeclare (queue , false , true , false , null );
120124
121- final AlteringConsumer consumer = new AlteringConsumer (channel , altQueue );
125+ CountDownLatch latch = new CountDownLatch (1 );
126+ final AlteringConsumer consumer = new AlteringConsumer (channel , altQueue , latch );
122127
123128 channel .basicConsume (queue , consumer );
124129 channel .queueDelete (queue );
125130
126- Thread .sleep (500 );
131+ latch .await ();
132+ // verify that handleCancel succeeded declaring the queue
127133 channel .queueDeclarePassive (altQueue );
128134 }
129135}
0 commit comments