Skip to content

Commit e1e1d5e

Browse files
janssk1acogoluegnes
authored andcommitted
637: Configurable correlatorId generation for RPCClient
Change-Id: I17d7a214d3336ad6e5fe890857a48b457ea599b6 (cherry picked from commit e554e4b)
1 parent c832a3f commit e1e1d5e

File tree

4 files changed

+57
-16
lines changed

4 files changed

+57
-16
lines changed
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package com.rabbitmq.client;
2+
3+
import java.util.function.Supplier;
4+
5+
public class IncrementingCorrelationIdGenerator implements Supplier<String> {
6+
7+
private final String _prefix;
8+
private int _correlationId;
9+
10+
public IncrementingCorrelationIdGenerator(String _prefix) {
11+
this._prefix = _prefix;
12+
}
13+
14+
@Override
15+
public String get() {
16+
return _prefix + _correlationId++;
17+
}
18+
}

src/main/java/com/rabbitmq/client/RpcClient.java

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.util.Map.Entry;
2929
import java.util.concurrent.TimeoutException;
3030
import java.util.function.Function;
31+
import java.util.function.Supplier;
3132

3233
import com.rabbitmq.client.impl.MethodArgumentReader;
3334
import com.rabbitmq.client.impl.MethodArgumentWriter;
@@ -79,12 +80,14 @@ public class RpcClient {
7980
}
8081
};
8182

83+
public static Supplier<String> DEFAULT_CORRELATION_ID_GENERATOR = new IncrementingCorrelationIdGenerator("");
84+
8285
private final Function<Object, Response> _replyHandler;
8386

8487
/** Map from request correlation ID to continuation BlockingCell */
8588
private final Map<String, BlockingCell<Object>> _continuationMap = new HashMap<String, BlockingCell<Object>>();
8689
/** Contains the most recently-used request correlation ID */
87-
private int _correlationId;
90+
private final Supplier<String> _correlationIdGenerator;
8891

8992
/** Consumer attached to our reply queue */
9093
private DefaultConsumer _consumer;
@@ -109,7 +112,7 @@ public RpcClient(RpcClientParams params) throws
109112
_timeout = params.getTimeout();
110113
_useMandatory = params.shouldUseMandatory();
111114
_replyHandler = params.getReplyHandler();
112-
_correlationId = 0;
115+
_correlationIdGenerator = params.getCorrelationIdGenerator();
113116

114117
_consumer = setupConsumer();
115118
if (_useMandatory) {
@@ -293,8 +296,7 @@ public Response doCall(AMQP.BasicProperties props, byte[] message, int timeout)
293296
BlockingCell<Object> k = new BlockingCell<Object>();
294297
String replyId;
295298
synchronized (_continuationMap) {
296-
_correlationId++;
297-
replyId = "" + _correlationId;
299+
replyId = _correlationIdGenerator.get();
298300
props = ((props==null) ? new AMQP.BasicProperties.Builder() : props.builder())
299301
.correlationId(replyId).replyTo(_replyTo).build();
300302
_continuationMap.put(replyId, k);
@@ -474,14 +476,6 @@ public Map<String, BlockingCell<Object>> getContinuationMap() {
474476
return _continuationMap;
475477
}
476478

477-
/**
478-
* Retrieve the correlation id.
479-
* @return the most recently used correlation id
480-
*/
481-
public int getCorrelationId() {
482-
return _correlationId;
483-
}
484-
485479
/**
486480
* Retrieve the consumer.
487481
* @return an interface to the client's consumer object

src/main/java/com/rabbitmq/client/RpcClientParams.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package com.rabbitmq.client;
1717

1818
import java.util.function.Function;
19+
import java.util.function.Supplier;
1920

2021
/**
2122
* Holder class to configure a {@link RpcClient}.
@@ -54,6 +55,8 @@ public class RpcClientParams {
5455
*/
5556
private Function<Object, RpcClient.Response> replyHandler = RpcClient.DEFAULT_REPLY_HANDLER;
5657

58+
private Supplier<String> correlationIdGenerator = RpcClient.DEFAULT_CORRELATION_ID_GENERATOR;
59+
5760
/**
5861
* Set the channel to use for communication.
5962
*
@@ -170,6 +173,15 @@ public boolean shouldUseMandatory() {
170173
return useMandatory;
171174
}
172175

176+
public RpcClientParams correlationIdGenerator(Supplier<String> correlationIdGenerator) {
177+
this.correlationIdGenerator = correlationIdGenerator;
178+
return this;
179+
}
180+
181+
public Supplier<String> getCorrelationIdGenerator() {
182+
return correlationIdGenerator;
183+
}
184+
173185
public Function<Object, RpcClient.Response> getReplyHandler() {
174186
return replyHandler;
175187
}

src/test/java/com/rabbitmq/client/test/RpcTest.java

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.rabbitmq.client.impl.recovery.RecordedQueue;
2525
import com.rabbitmq.client.impl.recovery.TopologyRecoveryFilter;
2626
import com.rabbitmq.tools.Host;
27+
import org.hamcrest.CoreMatchers;
2728
import org.junit.After;
2829
import org.junit.Before;
2930
import org.junit.Test;
@@ -39,9 +40,7 @@
3940
import java.util.concurrent.atomic.AtomicInteger;
4041

4142
import static org.awaitility.Awaitility.waitAtMost;
42-
import static org.junit.Assert.assertEquals;
43-
import static org.junit.Assert.assertTrue;
44-
import static org.junit.Assert.fail;
43+
import static org.junit.Assert.*;
4544

4645
public class RpcTest {
4746

@@ -138,6 +137,25 @@ public void rpcUnroutableWithMandatoryFlagShouldThrowUnroutableException() throw
138137
client.close();
139138
}
140139

140+
@Test
141+
public void rpcCustomCorrelatorId() throws Exception {
142+
rpcServer = new TestRpcServer(serverChannel, queue);
143+
new Thread(() -> {
144+
try {
145+
rpcServer.mainloop();
146+
} catch (Exception e) {
147+
// safe to ignore when loops ends/server is canceled
148+
}
149+
}).start();
150+
RpcClient client = new RpcClient(new RpcClientParams()
151+
.channel(clientChannel).exchange("").routingKey(queue).timeout(1000)
152+
.correlationIdGenerator(new IncrementingCorrelationIdGenerator("myPrefix-"))
153+
);
154+
RpcClient.Response response = client.doCall(null, "hello".getBytes());
155+
assertThat(response.getProperties().getCorrelationId(), CoreMatchers.equalTo("myPrefix-0"));
156+
client.close();
157+
}
158+
141159
@Test
142160
public void rpcCustomReplyHandler() throws Exception {
143161
rpcServer = new TestRpcServer(serverChannel, queue);
@@ -156,7 +174,6 @@ public void rpcCustomReplyHandler() throws Exception {
156174
return RpcClient.DEFAULT_REPLY_HANDLER.apply(reply);
157175
})
158176
);
159-
assertEquals(0, replyHandlerCalls.get());
160177
RpcClient.Response response = client.doCall(null, "hello".getBytes());
161178
assertEquals(1, replyHandlerCalls.get());
162179
assertEquals("*** hello ***", new String(response.getBody()));

0 commit comments

Comments
 (0)