3333import com .rabbitmq .client .impl .ValueReader ;
3434import com .rabbitmq .client .impl .ValueWriter ;
3535import com .rabbitmq .utility .BlockingCell ;
36+ import org .slf4j .Logger ;
37+ import org .slf4j .LoggerFactory ;
3638
3739/**
3840 * Convenience class which manages simple RPC-style communication.
4143 * and waiting for a response.
4244*/
4345public class RpcClient {
46+
47+ private static final Logger LOGGER = LoggerFactory .getLogger (RpcClient .class );
48+
4449 /** Channel we are communicating on */
4550 private final Channel _channel ;
4651 /** Exchange to send requests to */
@@ -192,9 +197,12 @@ public void handleDelivery(String consumerTag,
192197 String replyId = properties .getCorrelationId ();
193198 BlockingCell <Object > blocker =_continuationMap .remove (replyId );
194199 if (blocker == null ) {
195- throw new IllegalStateException ("No outstanding request for correlation ID " + replyId );
200+ // Entry should have been removed if request timed out,
201+ // log a warning nevertheless.
202+ LOGGER .warn ("No outstanding request for correlation ID {}" , replyId );
203+ } else {
204+ blocker .set (new Response (consumerTag , envelope , properties , body ));
196205 }
197- blocker .set (new Response (consumerTag , envelope , properties , body ));
198206 }
199207 }
200208 };
@@ -217,15 +225,23 @@ public Response doCall(AMQP.BasicProperties props, byte[] message, int timeout)
217225 throws IOException , ShutdownSignalException , TimeoutException {
218226 checkConsumer ();
219227 BlockingCell <Object > k = new BlockingCell <Object >();
228+ String replyId ;
220229 synchronized (_continuationMap ) {
221230 _correlationId ++;
222- String replyId = "" + _correlationId ;
231+ replyId = "" + _correlationId ;
223232 props = ((props ==null ) ? new AMQP .BasicProperties .Builder () : props .builder ())
224233 .correlationId (replyId ).replyTo (_replyTo ).build ();
225234 _continuationMap .put (replyId , k );
226235 }
227236 publish (props , message );
228- Object reply = k .uninterruptibleGet (timeout );
237+ Object reply ;
238+ try {
239+ reply = k .uninterruptibleGet (timeout );
240+ } catch (TimeoutException ex ) {
241+ // Avoid potential leak. This entry is no longer needed by caller.
242+ _continuationMap .remove (replyId );
243+ throw ex ;
244+ }
229245 if (reply instanceof ShutdownSignalException ) {
230246 ShutdownSignalException sig = (ShutdownSignalException ) reply ;
231247 ShutdownSignalException wrapper =
0 commit comments