4545/**
4646 * Test to trigger and check the fix of https://github.com/rabbitmq/rabbitmq-java-client/issues/341.
4747 * Conditions:
48- * - client registers consumer and a call QoS after
49- * - client get many messages and the consumer is slow
50- * - the work pool queue is full, the reading thread is stuck
51- * - more messages come from the network and saturates the TCP buffer
52- * - the connection dies but the client doesn't detect it
53- * - acks of messages fail
54- * - connection recovery is never triggered
55- *
48+ * - client registers consumer and a call QoS after
49+ * - client get many messages and the consumer is slow
50+ * - the work pool queue is full, the reading thread is stuck
51+ * - more messages come from the network and saturates the TCP buffer
52+ * - the connection dies but the client doesn't detect it
53+ * - acks of messages fail
54+ * - connection recovery is never triggered
55+ * <p>
5656 * The fix consists in triggering connection recovery when writing
5757 * to the socket fails. As the socket is dead, the closing
5858 * sequence can take some time, hence the setup of the shutdown
@@ -88,7 +88,7 @@ public void setUp() throws Exception {
8888 factory .setShutdownExecutor (shutdownService );
8989 factory .setShutdownTimeout (10000 );
9090 factory .setRequestedHeartbeat (5 );
91- factory .setShutdownExecutor (dispatchingService );
91+ factory .setSharedExecutor (dispatchingService );
9292 factory .setNetworkRecoveryInterval (1000 );
9393
9494 producingConnection = (AutorecoveringConnection ) factory .newConnection ("Producer Connection" );
@@ -101,15 +101,19 @@ public void setUp() throws Exception {
101101
102102 @ After
103103 public void tearDown () throws IOException {
104+ closeConnectionIfOpen (consumingConnection );
105+ closeConnectionIfOpen (producingConnection );
106+
104107 dispatchingService .shutdownNow ();
105108 producerService .shutdownNow ();
106109 shutdownService .shutdownNow ();
107- closeConnectionIfOpen (consumingConnection );
108- closeConnectionIfOpen (producingConnection );
109110 }
110111
111112 @ Test
112113 public void failureAndRecovery () throws IOException , InterruptedException {
114+ if (TestUtils .USE_NIO ) {
115+ return ;
116+ }
113117 final String queue = UUID .randomUUID ().toString ();
114118
115119 final CountDownLatch latch = new CountDownLatch (1 );
@@ -152,7 +156,7 @@ private void declareQueue(final Channel channel, final String queue) throws IOEx
152156 channel .queueDeclare (queue , false , false , false , queueArguments );
153157 }
154158
155- private void produceMessagesInBackground (final Channel channel , final String queue ) {
159+ private void produceMessagesInBackground (final Channel channel , final String queue ) throws IOException {
156160 final AMQP .BasicProperties properties = new AMQP .BasicProperties .Builder ().deliveryMode (1 ).build ();
157161 producerService .submit (new Callable <Void >() {
158162
@@ -178,7 +182,11 @@ public void handleRecoverOk(String consumerTag) {
178182 @ Override
179183 public void handleDelivery (String consumerTag , Envelope envelope , AMQP .BasicProperties properties , byte [] body ) throws IOException {
180184 consumerWork ();
181- consumingChannel .basicAck (envelope .getDeliveryTag (), false );
185+ try {
186+ consumingChannel .basicAck (envelope .getDeliveryTag (), false );
187+ } catch (Exception e ) {
188+ // application should handle writing exceptions
189+ }
182190 }
183191 });
184192 try {
0 commit comments