Skip to content

Commit 4da4704

Browse files
committed
Merge branch '5.3.x-stable' into 5.x.x-stable
2 parents 2b251b0 + 3cd26ff commit 4da4704

File tree

5 files changed

+262
-28
lines changed

5 files changed

+262
-28
lines changed

src/main/java/com/rabbitmq/client/impl/nio/NioLoopContext.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,18 @@
1+
// Copyright (c) 2007-Present Pivotal Software, Inc. All rights reserved.
2+
//
3+
// This software, the RabbitMQ Java client library, is triple-licensed under the
4+
// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
5+
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
6+
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
7+
// please see LICENSE-APACHE2.
8+
//
9+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
10+
// either express or implied. See the LICENSE file for specific language governing
11+
// rights and limitations of this software.
12+
//
13+
// If you have any questions regarding licensing, please contact us at
14+
// info@rabbitmq.com.
15+
116
package com.rabbitmq.client.impl.nio;
217

318
import com.rabbitmq.client.impl.Environment;
@@ -38,6 +53,7 @@ public NioLoopContext(SocketChannelFrameHandlerFactory socketChannelFrameHandler
3853
}
3954

4055
void initStateIfNecessary() throws IOException {
56+
// FIXME this should be synchronized
4157
if (this.readSelectorState == null) {
4258
this.readSelectorState = new SelectorHolder(Selector.open());
4359
this.writeSelectorState = new SelectorHolder(Selector.open());

src/main/java/com/rabbitmq/tools/jsonrpc/JsonRpcClient.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
import com.rabbitmq.client.ShutdownSignalException;
2929
import com.rabbitmq.tools.json.JSONReader;
3030
import com.rabbitmq.tools.json.JSONWriter;
31+
import org.slf4j.Logger;
32+
import org.slf4j.LoggerFactory;
3133

3234
/**
3335
<a href="http://json-rpc.org">JSON-RPC</a> is a lightweight
@@ -54,6 +56,9 @@
5456
@see #call(String[])
5557
*/
5658
public class JsonRpcClient extends RpcClient implements InvocationHandler {
59+
60+
private static final Logger LOGGER = LoggerFactory.getLogger(JsonRpcClient.class);
61+
5762
/** Holds the JSON-RPC service description for this client. */
5863
private ServiceDescription serviceDescription;
5964

@@ -92,7 +97,6 @@ public static Object checkReply(Map<String, Object> reply)
9297
}
9398

9499
Object result = reply.get("result");
95-
//System.out.println(new JSONWriter().write(result));
96100
return result;
97101
}
98102

@@ -113,8 +117,12 @@ public Object call(String method, Object[] params) throws IOException, JsonRpcEx
113117
String requestStr = new JSONWriter().write(request);
114118
try {
115119
String replyStr = this.stringCall(requestStr);
120+
if (LOGGER.isDebugEnabled()) {
121+
LOGGER.debug("Reply string: {}", replyStr);
122+
}
116123
@SuppressWarnings("unchecked")
117124
Map<String, Object> map = (Map<String, Object>) (new JSONReader().read(replyStr));
125+
118126
return checkReply(map);
119127
} catch(ShutdownSignalException ex) {
120128
throw new IOException(ex.getMessage()); // wrap, re-throw
@@ -137,10 +145,11 @@ public Object invoke(Object proxy, Method method, Object[] args)
137145
/**
138146
* Public API - gets a dynamic proxy for a particular interface class.
139147
*/
140-
public Object createProxy(Class<?> klass)
148+
@SuppressWarnings("unchecked")
149+
public <T> T createProxy(Class<T> klass)
141150
throws IllegalArgumentException
142151
{
143-
return Proxy.newProxyInstance(klass.getClassLoader(),
152+
return (T) Proxy.newProxyInstance(klass.getClassLoader(),
144153
new Class[] { klass },
145154
this);
146155
}
@@ -161,8 +170,8 @@ public static Object coerce(String val, String type)
161170
return Double.valueOf(val);
162171
}
163172
} else if ("str".equals(type)) {
164-
return val;
165-
} else if ("arr".equals(type) || "obj".equals(type) || "any".equals(type)) {
173+
return val;
174+
} else if ("arr".equals(type) || "obj".equals(type) || "any".equals(type)) {
166175
return new JSONReader().read(val);
167176
} else if ("nil".equals(type)) {
168177
return null;

src/main/java/com/rabbitmq/tools/jsonrpc/JsonRpcServer.java

Lines changed: 53 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
import java.io.IOException;
2020
import java.lang.reflect.Method;
21+
import java.util.ArrayList;
22+
import java.util.Collection;
2123
import java.util.HashMap;
2224
import java.util.List;
2325
import java.util.Map;
@@ -27,6 +29,8 @@
2729
import com.rabbitmq.client.StringRpcServer;
2830
import com.rabbitmq.tools.json.JSONReader;
2931
import com.rabbitmq.tools.json.JSONWriter;
32+
import org.slf4j.Logger;
33+
import org.slf4j.LoggerFactory;
3034

3135
/**
3236
* JSON-RPC Server class.
@@ -41,6 +45,9 @@
4145
* @see JsonRpcClient
4246
*/
4347
public class JsonRpcServer extends StringRpcServer {
48+
49+
private static final Logger LOGGER = LoggerFactory.getLogger(JsonRpcServer.class);
50+
4451
/** Holds the JSON-RPC service description for this client. */
4552
public ServiceDescription serviceDescription;
4653
/** The interface this server implements. */
@@ -114,38 +121,62 @@ public String doCall(String requestBody)
114121
Object id;
115122
String method;
116123
Object[] params;
124+
String response;
125+
if (LOGGER.isDebugEnabled()) {
126+
LOGGER.debug("Request: {}", requestBody);
127+
}
117128
try {
118129
@SuppressWarnings("unchecked")
119130
Map<String, Object> request = (Map<String,Object>) new JSONReader().read(requestBody);
120131
if (request == null) {
121-
return errorResponse(null, 400, "Bad Request", null);
122-
}
123-
if (!ServiceDescription.JSON_RPC_VERSION.equals(request.get("version"))) {
124-
return errorResponse(null, 505, "JSONRPC version not supported", null);
132+
response = errorResponse(null, 400, "Bad Request", null);
133+
} else if (!ServiceDescription.JSON_RPC_VERSION.equals(request.get("version"))) {
134+
response = errorResponse(null, 505, "JSONRPC version not supported", null);
135+
} else {
136+
id = request.get("id");
137+
method = (String) request.get("method");
138+
List<?> parmList = (List<?>) request.get("params");
139+
params = parmList.toArray();
140+
if (method.equals("system.describe")) {
141+
response = resultResponse(id, serviceDescription);
142+
} else if (method.startsWith("system.")) {
143+
response = errorResponse(id, 403, "System methods forbidden", null);
144+
} else {
145+
Object result;
146+
try {
147+
Method matchingMethod = matchingMethod(method, params);
148+
if (LOGGER.isDebugEnabled()) {
149+
Collection<String> parametersValuesAndTypes = new ArrayList<String>();
150+
if (params != null) {
151+
for (Object param : params) {
152+
parametersValuesAndTypes.add(
153+
String.format("%s (%s)", param, param == null ? "?" : param.getClass())
154+
);
155+
}
156+
}
157+
LOGGER.debug("About to invoke {} method with parameters {}", matchingMethod, parametersValuesAndTypes);
158+
}
159+
result = matchingMethod.invoke(interfaceInstance, params);
160+
if (LOGGER.isDebugEnabled()) {
161+
LOGGER.debug("Invocation returned {} ({})", result, result == null ? "?" : result.getClass());
162+
}
163+
response = resultResponse(id, result);
164+
} catch (Throwable t) {
165+
LOGGER.info("Error while processing JSON RPC request", t);
166+
response = errorResponse(id, 500, "Internal Server Error", t);
167+
}
168+
}
125169
}
126-
127-
id = request.get("id");
128-
method = (String) request.get("method");
129-
List<?> parmList = (List<?>) request.get("params");
130-
params = parmList.toArray();
131170
} catch (ClassCastException cce) {
132171
// Bogus request!
133-
return errorResponse(null, 400, "Bad Request", null);
172+
response = errorResponse(null, 400, "Bad Request", null);
134173
}
135174

136-
if (method.equals("system.describe")) {
137-
return resultResponse(id, serviceDescription);
138-
} else if (method.startsWith("system.")) {
139-
return errorResponse(id, 403, "System methods forbidden", null);
140-
} else {
141-
Object result;
142-
try {
143-
result = matchingMethod(method, params).invoke(interfaceInstance, params);
144-
} catch (Throwable t) {
145-
return errorResponse(id, 500, "Internal Server Error", t);
146-
}
147-
return resultResponse(id, result);
175+
if (LOGGER.isDebugEnabled()) {
176+
LOGGER.debug("Response: {}", response);
148177
}
178+
179+
return response;
149180
}
150181

151182
/**
Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
// Copyright (c) 2018 Pivotal Software, Inc. All rights reserved.
2+
//
3+
// This software, the RabbitMQ Java client library, is triple-licensed under the
4+
// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
5+
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
6+
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
7+
// please see LICENSE-APACHE2.
8+
//
9+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
10+
// either express or implied. See the LICENSE file for specific language governing
11+
// rights and limitations of this software.
12+
//
13+
// If you have any questions regarding licensing, please contact us at
14+
// info@rabbitmq.com.
15+
16+
package com.rabbitmq.client;
17+
18+
import com.rabbitmq.client.test.TestUtils;
19+
import com.rabbitmq.tools.jsonrpc.JsonRpcClient;
20+
import com.rabbitmq.tools.jsonrpc.JsonRpcServer;
21+
import org.junit.After;
22+
import org.junit.Before;
23+
import org.junit.Test;
24+
25+
import java.lang.reflect.UndeclaredThrowableException;
26+
27+
import static org.junit.Assert.assertEquals;
28+
import static org.junit.Assert.fail;
29+
30+
public class JsonRpcTest {
31+
32+
Connection clientConnection, serverConnection;
33+
Channel clientChannel, serverChannel;
34+
String queue = "json.rpc.queue";
35+
JsonRpcServer server;
36+
JsonRpcClient client;
37+
RpcService service;
38+
39+
@Before
40+
public void init() throws Exception {
41+
clientConnection = TestUtils.connectionFactory().newConnection();
42+
clientChannel = clientConnection.createChannel();
43+
serverConnection = TestUtils.connectionFactory().newConnection();
44+
serverChannel = serverConnection.createChannel();
45+
serverChannel.queueDeclare(queue, false, false, false, null);
46+
server = new JsonRpcServer(serverChannel, queue, RpcService.class, new DefaultRpcservice());
47+
new Thread(new Runnable() {
48+
49+
@Override
50+
public void run() {
51+
try {
52+
server.mainloop();
53+
} catch (Exception e) {
54+
// safe to ignore when loops ends/server is canceled
55+
}
56+
}
57+
}).start();
58+
client = new JsonRpcClient(clientChannel, "", queue, 1000);
59+
service = client.createProxy(RpcService.class);
60+
}
61+
62+
@After
63+
public void tearDown() throws Exception {
64+
if (server != null) {
65+
server.terminateMainloop();
66+
}
67+
if (client != null) {
68+
client.close();
69+
}
70+
if (serverChannel != null) {
71+
serverChannel.queueDelete(queue);
72+
}
73+
clientConnection.close();
74+
serverConnection.close();
75+
}
76+
77+
@Test
78+
public void rpc() {
79+
assertEquals("hello1", service.procedureString("hello"));
80+
assertEquals(2, service.procedureInteger(1).intValue());
81+
assertEquals(2, service.procedurePrimitiveInteger(1));
82+
assertEquals(2, service.procedureDouble(1.0).intValue());
83+
assertEquals(2, (int) service.procedurePrimitiveDouble(1.0));
84+
85+
try {
86+
assertEquals(2, (int) service.procedureLongToInteger(1L));
87+
fail("Long argument isn't supported");
88+
} catch (UndeclaredThrowableException e) {
89+
// OK
90+
}
91+
assertEquals(2, service.procedurePrimitiveLongToInteger(1L));
92+
93+
try {
94+
assertEquals(2, service.procedurePrimitiveLong(1L));
95+
fail("Long return type not supported");
96+
} catch (ClassCastException e) {
97+
// OK
98+
}
99+
100+
try {
101+
assertEquals(2, service.procedureLong(1L).longValue());
102+
fail("Long argument isn't supported");
103+
} catch (UndeclaredThrowableException e) {
104+
// OK
105+
}
106+
}
107+
108+
public interface RpcService {
109+
110+
String procedureString(String input);
111+
112+
int procedurePrimitiveInteger(int input);
113+
114+
Integer procedureInteger(Integer input);
115+
116+
Double procedureDouble(Double input);
117+
118+
double procedurePrimitiveDouble(double input);
119+
120+
Integer procedureLongToInteger(Long input);
121+
122+
int procedurePrimitiveLongToInteger(long input);
123+
124+
Long procedureLong(Long input);
125+
126+
long procedurePrimitiveLong(long input);
127+
}
128+
129+
public class DefaultRpcservice implements RpcService {
130+
131+
@Override
132+
public String procedureString(String input) {
133+
return input + 1;
134+
}
135+
136+
@Override
137+
public int procedurePrimitiveInteger(int input) {
138+
return input + 1;
139+
}
140+
141+
@Override
142+
public Integer procedureInteger(Integer input) {
143+
return input + 1;
144+
}
145+
146+
@Override
147+
public Long procedureLong(Long input) {
148+
return input + 1;
149+
}
150+
151+
@Override
152+
public long procedurePrimitiveLong(long input) {
153+
return input + 1L;
154+
}
155+
156+
@Override
157+
public Double procedureDouble(Double input) {
158+
return input + 1;
159+
}
160+
161+
@Override
162+
public double procedurePrimitiveDouble(double input) {
163+
return input + 1;
164+
}
165+
166+
@Override
167+
public Integer procedureLongToInteger(Long input) {
168+
return (int) (input + 1);
169+
}
170+
171+
@Override
172+
public int procedurePrimitiveLongToInteger(long input) {
173+
return (int) input + 1;
174+
}
175+
}
176+
}

src/test/java/com/rabbitmq/client/test/ClientTests.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package com.rabbitmq.client.test;
1818

19+
import com.rabbitmq.client.JsonRpcTest;
1920
import com.rabbitmq.utility.IntAllocatorTests;
2021
import org.junit.runner.RunWith;
2122
import org.junit.runners.Suite;
@@ -59,7 +60,8 @@
5960
ClientVersionTest.class,
6061
TestUtilsTest.class,
6162
StrictExceptionHandlerTest.class,
62-
NoAutoRecoveryWhenTcpWindowIsFullTest.class
63+
NoAutoRecoveryWhenTcpWindowIsFullTest.class,
64+
JsonRpcTest.class
6365
})
6466
public class ClientTests {
6567

0 commit comments

Comments
 (0)