Skip to content

Commit 4de5d1c

Browse files
author
Simon MacMullen
committed
Prototype user of fast RPC replies.
1 parent 7dd191a commit 4de5d1c

File tree

1 file changed

+2
-25
lines changed

1 file changed

+2
-25
lines changed

src/com/rabbitmq/client/RpcClient.java

Lines changed: 2 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,6 @@ public class RpcClient {
5858
/** Contains the most recently-used request correlation ID */
5959
private int _correlationId;
6060

61-
/** The name of our private reply queue */
62-
private String _replyQueue;
6361
/** Consumer attached to our reply queue */
6462
private DefaultConsumer _consumer;
6563

@@ -73,7 +71,6 @@ public class RpcClient {
7371
* @param routingKey the routing key
7472
* @param timeout milliseconds before timing out on wait for response
7573
* @throws IOException if an error is encountered
76-
* @see #setupReplyQueue
7774
*/
7875
public RpcClient(Channel channel, String exchange, String routingKey, int timeout) throws IOException {
7976
_channel = channel;
@@ -83,7 +80,6 @@ public RpcClient(Channel channel, String exchange, String routingKey, int timeou
8380
_timeout = timeout;
8481
_correlationId = 0;
8582

86-
_replyQueue = setupReplyQueue();
8783
_consumer = setupConsumer();
8884
}
8985

@@ -98,7 +94,6 @@ public RpcClient(Channel channel, String exchange, String routingKey, int timeou
9894
* @param exchange the exchange to connect to
9995
* @param routingKey the routing key
10096
* @throws IOException if an error is encountered
101-
* @see #setupReplyQueue
10297
*/
10398
public RpcClient(Channel channel, String exchange, String routingKey) throws IOException {
10499
this(channel, exchange, routingKey, NO_TIMEOUT);
@@ -125,16 +120,6 @@ public void close() throws IOException {
125120
}
126121
}
127122

128-
/**
129-
* Creates a server-named exclusive autodelete queue to use for
130-
* receiving replies to RPC requests.
131-
* @throws IOException if an error is encountered
132-
* @return the name of the reply queue
133-
*/
134-
protected String setupReplyQueue() throws IOException {
135-
return _channel.queueDeclare("", false, true, true, null).getQueue();
136-
}
137-
138123
/**
139124
* Registers a consumer on the reply queue.
140125
* @throws IOException if an error is encountered
@@ -167,7 +152,7 @@ public void handleDelivery(String consumerTag,
167152
}
168153
}
169154
};
170-
_channel.basicConsume(_replyQueue, true, consumer);
155+
_channel.basicConsume("amq.reply-to", true, consumer);
171156
return consumer;
172157
}
173158

@@ -186,7 +171,7 @@ public byte[] primitiveCall(AMQP.BasicProperties props, byte[] message)
186171
_correlationId++;
187172
String replyId = "" + _correlationId;
188173
props = ((props==null) ? new AMQP.BasicProperties.Builder() : props.builder())
189-
.correlationId(replyId).replyTo(_replyQueue).build();
174+
.correlationId(replyId).replyTo("amq.reply-consumer").build();
190175
_continuationMap.put(replyId, k);
191176
}
192177
publish(props, message);
@@ -332,14 +317,6 @@ public int getCorrelationId() {
332317
return _correlationId;
333318
}
334319

335-
/**
336-
* Retrieve the reply queue.
337-
* @return the name of the client's reply queue
338-
*/
339-
public String getReplyQueue() {
340-
return _replyQueue;
341-
}
342-
343320
/**
344321
* Retrieve the consumer.
345322
* @return an interface to the client's consumer object

0 commit comments

Comments
 (0)