Skip to content

Commit 11fdaa9

Browse files
committed
Log warning when correlation ID isn't found in RpcClient
Add a test as well to check the outstanding request map is cleaned after a timeout. References #375
1 parent 11baba6 commit 11fdaa9

File tree

2 files changed

+22
-3
lines changed

2 files changed

+22
-3
lines changed

src/main/java/com/rabbitmq/client/RpcClient.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333
import com.rabbitmq.client.impl.ValueReader;
3434
import com.rabbitmq.client.impl.ValueWriter;
3535
import com.rabbitmq.utility.BlockingCell;
36+
import org.slf4j.Logger;
37+
import org.slf4j.LoggerFactory;
3638

3739
/**
3840
* Convenience class which manages simple RPC-style communication.
@@ -41,6 +43,9 @@
4143
* and waiting for a response.
4244
*/
4345
public class RpcClient {
46+
47+
private static final Logger LOGGER = LoggerFactory.getLogger(RpcClient.class);
48+
4449
/** Channel we are communicating on */
4550
private final Channel _channel;
4651
/** Exchange to send requests to */
@@ -191,10 +196,12 @@ public void handleDelivery(String consumerTag,
191196
synchronized (_continuationMap) {
192197
String replyId = properties.getCorrelationId();
193198
BlockingCell<Object> blocker =_continuationMap.remove(replyId);
194-
if (blocker != null) {
195-
blocker.set(new Response(consumerTag, envelope, properties, body));
199+
if (blocker == null) {
200+
// Entry should have been removed if request timed out,
201+
// log a warning nevertheless.
202+
LOGGER.warn("No outstanding request for correlation ID {}", replyId);
196203
} else {
197-
// Not an error. Entry will have been removed if request timed out.
204+
blocker.set(new Response(consumerTag, envelope, properties, body));
198205
}
199206
}
200207
}

src/test/java/com/rabbitmq/client/test/RpcTest.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.io.IOException;
2525
import java.util.HashMap;
2626
import java.util.Map;
27+
import java.util.concurrent.TimeoutException;
2728

2829
import static org.junit.Assert.assertEquals;
2930

@@ -72,6 +73,17 @@ public void rpc() throws Exception {
7273
client.close();
7374
}
7475

76+
@Test public void rpcResponseTimeout() throws Exception {
77+
RpcClient client = new RpcClient(clientChannel, "", queue);
78+
try {
79+
client.doCall(null, "hello".getBytes(), 200);
80+
} catch (TimeoutException e) {
81+
// OK
82+
}
83+
assertEquals(0, client.getContinuationMap().size());
84+
client.close();
85+
}
86+
7587
private static class TestRpcServer extends RpcServer {
7688

7789
public TestRpcServer(Channel channel, String queueName) throws IOException {

0 commit comments

Comments
 (0)