@@ -191,10 +191,11 @@ public void handleDelivery(String consumerTag,
191191 synchronized (_continuationMap ) {
192192 String replyId = properties .getCorrelationId ();
193193 BlockingCell <Object > blocker =_continuationMap .remove (replyId );
194- if (blocker == null ) {
195- throw new IllegalStateException ("No outstanding request for correlation ID " + replyId );
196- }
197- blocker .set (new Response (consumerTag , envelope , properties , body ));
194+ if (blocker != null ) {
195+ blocker .set (new Response (consumerTag , envelope , properties , body ));
196+ } else {
197+ // Not an error. Entry will have been removed if request timed out.
198+ }
198199 }
199200 }
200201 };
@@ -217,15 +218,23 @@ public Response doCall(AMQP.BasicProperties props, byte[] message, int timeout)
217218 throws IOException , ShutdownSignalException , TimeoutException {
218219 checkConsumer ();
219220 BlockingCell <Object > k = new BlockingCell <Object >();
221+ String replyId ;
220222 synchronized (_continuationMap ) {
221223 _correlationId ++;
222- String replyId = "" + _correlationId ;
224+ replyId = "" + _correlationId ;
223225 props = ((props ==null ) ? new AMQP .BasicProperties .Builder () : props .builder ())
224226 .correlationId (replyId ).replyTo (_replyTo ).build ();
225227 _continuationMap .put (replyId , k );
226228 }
227229 publish (props , message );
228- Object reply = k .uninterruptibleGet (timeout );
230+ Object reply ;
231+ try {
232+ reply = k .uninterruptibleGet (timeout );
233+ } catch (TimeoutException ex ) {
234+ // Avoid potential leak. This entry is no longer needed by caller.
235+ _continuationMap .remove (replyId );
236+ throw ex ;
237+ }
229238 if (reply instanceof ShutdownSignalException ) {
230239 ShutdownSignalException sig = (ShutdownSignalException ) reply ;
231240 ShutdownSignalException wrapper =
0 commit comments