2727import java .util .Map ;
2828import java .util .Map .Entry ;
2929import java .util .concurrent .TimeoutException ;
30+ import java .util .function .Function ;
3031
3132import com .rabbitmq .client .impl .MethodArgumentReader ;
3233import com .rabbitmq .client .impl .MethodArgumentWriter ;
@@ -58,6 +59,27 @@ public class RpcClient {
5859 private final int _timeout ;
5960 /** NO_TIMEOUT value must match convention on {@link BlockingCell#uninterruptibleGet(int)} */
6061 protected final static int NO_TIMEOUT = -1 ;
62+ /** Whether to publish RPC requests with the mandatory flag or not. */
63+ private final boolean _useMandatory ;
64+
65+ public final static Function <Object , Response > DEFAULT_REPLY_HANDLER = reply -> {
66+ if (reply instanceof ShutdownSignalException ) {
67+ ShutdownSignalException sig = (ShutdownSignalException ) reply ;
68+ ShutdownSignalException wrapper =
69+ new ShutdownSignalException (sig .isHardError (),
70+ sig .isInitiatedByApplication (),
71+ sig .getReason (),
72+ sig .getReference ());
73+ wrapper .initCause (sig );
74+ throw wrapper ;
75+ } else if (reply instanceof UnroutableRpcRequestException ) {
76+ throw (UnroutableRpcRequestException ) reply ;
77+ } else {
78+ return (Response ) reply ;
79+ }
80+ };
81+
82+ private final Function <Object , Response > _replyHandler ;
6183
6284 /** Map from request correlation ID to continuation BlockingCell */
6385 private final Map <String , BlockingCell <Object >> _continuationMap = new HashMap <String , BlockingCell <Object >>();
@@ -67,6 +89,46 @@ public class RpcClient {
6789 /** Consumer attached to our reply queue */
6890 private DefaultConsumer _consumer ;
6991
92+ /**
93+ * Construct a {@link RpcClient} with the passed-in {@link RpcClientParams}.
94+ *
95+ * @param params
96+ * @throws IOException
97+ * @see RpcClientParams
98+ * @since 5.6.0
99+ */
100+ public RpcClient (RpcClientParams params ) throws
101+ IOException {
102+ _channel = params .getChannel ();
103+ _exchange = params .getExchange ();
104+ _routingKey = params .getRoutingKey ();
105+ _replyTo = params .getReplyTo ();
106+ if (params .getTimeout () < NO_TIMEOUT ) {
107+ throw new IllegalArgumentException ("Timeout argument must be NO_TIMEOUT(-1) or non-negative." );
108+ }
109+ _timeout = params .getTimeout ();
110+ _useMandatory = params .isUseMandatory ();
111+ _replyHandler = params .getReplyHandler ();
112+ _correlationId = 0 ;
113+
114+ _consumer = setupConsumer ();
115+ if (_useMandatory ) {
116+ this ._channel .addReturnListener (returnMessage -> {
117+ synchronized (_continuationMap ) {
118+ String replyId = returnMessage .getProperties ().getCorrelationId ();
119+ BlockingCell <Object > blocker = _continuationMap .remove (replyId );
120+ if (blocker == null ) {
121+ // Entry should have been removed if request timed out,
122+ // log a warning nevertheless.
123+ LOGGER .warn ("No outstanding request for correlation ID {}" , replyId );
124+ } else {
125+ blocker .set (new UnroutableRpcRequestException (returnMessage ));
126+ }
127+ }
128+ });
129+ }
130+ }
131+
70132 /**
71133 * Construct a new RpcClient that will communicate on the given channel, sending
72134 * requests to the given exchange with the given routing key.
@@ -78,15 +140,19 @@ public class RpcClient {
78140 * @param replyTo the queue where the server should put the reply
79141 * @param timeout milliseconds before timing out on wait for response
80142 * @throws IOException if an error is encountered
143+ * @deprecated use {@link RpcClient#RpcClient(RpcClientParams)} instead, will be removed in 6.0.0
81144 */
145+ @ Deprecated
82146 public RpcClient (Channel channel , String exchange , String routingKey , String replyTo , int timeout ) throws
83147 IOException {
84148 _channel = channel ;
85149 _exchange = exchange ;
86150 _routingKey = routingKey ;
87151 _replyTo = replyTo ;
88- if (timeout < NO_TIMEOUT ) throw new IllegalArgumentException ("Timeout arguument must be NO_TIMEOUT(-1) or non-negative." );
152+ if (timeout < NO_TIMEOUT ) throw new IllegalArgumentException ("Timeout argument must be NO_TIMEOUT(-1) or non-negative." );
89153 _timeout = timeout ;
154+ _useMandatory = false ;
155+ _replyHandler = DEFAULT_REPLY_HANDLER ;
90156 _correlationId = 0 ;
91157
92158 _consumer = setupConsumer ();
@@ -106,7 +172,9 @@ public RpcClient(Channel channel, String exchange, String routingKey, String rep
106172 * @param routingKey the routing key
107173 * @param replyTo the queue where the server should put the reply
108174 * @throws IOException if an error is encountered
175+ * @deprecated use {@link RpcClient#RpcClient(RpcClientParams)} instead, will be removed in 6.0.0
109176 */
177+ @ Deprecated
110178 public RpcClient (Channel channel , String exchange , String routingKey , String replyTo ) throws IOException {
111179 this (channel , exchange , routingKey , replyTo , NO_TIMEOUT );
112180 }
@@ -123,7 +191,9 @@ public RpcClient(Channel channel, String exchange, String routingKey, String rep
123191 * @param exchange the exchange to connect to
124192 * @param routingKey the routing key
125193 * @throws IOException if an error is encountered
194+ * @deprecated use {@link RpcClient#RpcClient(RpcClientParams)} instead, will be removed in 6.0.0
126195 */
196+ @ Deprecated
127197 public RpcClient (Channel channel , String exchange , String routingKey ) throws IOException {
128198 this (channel , exchange , routingKey , "amq.rabbitmq.reply-to" , NO_TIMEOUT );
129199 }
@@ -142,7 +212,9 @@ public RpcClient(Channel channel, String exchange, String routingKey) throws IOE
142212 * @param routingKey the routing key
143213 * @param timeout milliseconds before timing out on wait for response
144214 * @throws IOException if an error is encountered
215+ * @deprecated use {@link RpcClient#RpcClient(RpcClientParams)} instead, will be removed in 6.0.0
145216 */
217+ @ Deprecated
146218 public RpcClient (Channel channel , String exchange , String routingKey , int timeout ) throws IOException {
147219 this (channel , exchange , routingKey , "amq.rabbitmq.reply-to" , timeout );
148220 }
@@ -213,7 +285,7 @@ public void handleDelivery(String consumerTag,
213285 public void publish (AMQP .BasicProperties props , byte [] message )
214286 throws IOException
215287 {
216- _channel .basicPublish (_exchange , _routingKey , props , message );
288+ _channel .basicPublish (_exchange , _routingKey , _useMandatory , props , message );
217289 }
218290
219291 public Response doCall (AMQP .BasicProperties props , byte [] message )
@@ -242,18 +314,7 @@ public Response doCall(AMQP.BasicProperties props, byte[] message, int timeout)
242314 _continuationMap .remove (replyId );
243315 throw ex ;
244316 }
245- if (reply instanceof ShutdownSignalException ) {
246- ShutdownSignalException sig = (ShutdownSignalException ) reply ;
247- ShutdownSignalException wrapper =
248- new ShutdownSignalException (sig .isHardError (),
249- sig .isInitiatedByApplication (),
250- sig .getReason (),
251- sig .getReference ());
252- wrapper .initCause (sig );
253- throw wrapper ;
254- } else {
255- return (Response ) reply ;
256- }
317+ return _replyHandler .apply (reply );
257318 }
258319
259320 public byte [] primitiveCall (AMQP .BasicProperties props , byte [] message )
0 commit comments