Skip to content

Commit 7b3d8a5

Browse files
committed
merge default into bug19250
2 parents c2cfe93 + d0d055f commit 7b3d8a5

22 files changed

+542
-224
lines changed

build.xml

Lines changed: 39 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
<?xml version="1.0"?>
2-
<project default="build">
2+
<project name="RabbitMQ Java client" default="build">
33

44
<property file="build.properties"/>
55
<property file="config.properties"/>
@@ -27,20 +27,54 @@
2727
</path>
2828

2929
<property name="AMQP_SPEC_JSON_PATH" value="${codegen.dir}/amqp-${spec.version}.json"/>
30-
31-
<target name="amqp-generate" description="generate AMQP.java and AMQImpl.java from AMQP spec">
30+
31+
<target name="amqp-generate-check" description="check if codegen needs to be run">
32+
<uptodate property="amqp.generate.notRequired">
33+
<srcfiles file="codegen.py"/>
34+
<srcfiles dir="${codegen.dir}">
35+
<include name="*" />
36+
</srcfiles>
37+
<compositemapper>
38+
<mapper type="merge" to="${basedir}/${src.generated}/com/rabbitmq/client/impl/AMQImpl.java" />
39+
<mapper type="merge" to="${basedir}/${src.generated}/com/rabbitmq/client/AMQP.java" />
40+
</compositemapper>
41+
</uptodate>
42+
</target>
43+
44+
<target name="amqp-generate" depends="amqp-generate-check"
45+
unless="amqp.generate.notRequired" description="generate AMQP.java and AMQImpl.java from AMQP spec">
3246
<mkdir dir="${src.generated}/com/rabbitmq/client/"/>
33-
<exec dir="." executable="python" output="${src.generated}/com/rabbitmq/client/AMQP.java">
47+
<exec dir="." executable="python"
48+
output="${src.generated}/com/rabbitmq/client/AMQP.java"
49+
errorproperty="amqp.generate.error1"
50+
resultproperty="amqp.generate.result1">
3451
<arg line="codegen.py"/>
3552
<arg line="header"/>
3653
<arg line="${AMQP_SPEC_JSON_PATH}"/>
3754
</exec>
55+
<fail message="Generation of AMQP.java failed with message:${line.separator}${amqp.generate.error1}">
56+
<condition>
57+
<not>
58+
<equals arg1="${amqp.generate.result1}" arg2="0" />
59+
</not>
60+
</condition>
61+
</fail>
3862
<mkdir dir="${src.generated}/com/rabbitmq/client/impl"/>
39-
<exec dir="." executable="python" output="${src.generated}/com/rabbitmq/client/impl/AMQImpl.java">
63+
<exec dir="." executable="python"
64+
output="${src.generated}/com/rabbitmq/client/impl/AMQImpl.java"
65+
errorproperty="amqp.generate.error2"
66+
resultproperty="amqp.generate.result2">
4067
<arg line="codegen.py"/>
4168
<arg line="body"/>
4269
<arg line="${AMQP_SPEC_JSON_PATH}"/>
4370
</exec>
71+
<fail message="Generation of AMQPImpl.java failed with message:${line.separator}${amqp.generate.error2}">
72+
<condition>
73+
<not>
74+
<equals arg1="${amqp.generate.result2}" arg2="0" />
75+
</not>
76+
</condition>
77+
</fail>
4478
</target>
4579

4680
<target name="build" depends="amqp-generate">
@@ -236,7 +270,6 @@
236270

237271
<target name="jar" depends="build">
238272
<mkdir dir="${lib.out}"/>
239-
<echo message="Javac out = ${javac.out}"/>
240273
<antcall target="doJarWithTags">
241274
<param name="jar.name" value="rabbitmq-client"/>
242275
<param name="base" value="${javac.out}"/>
@@ -267,7 +300,6 @@
267300
</target>
268301

269302
<target name="doJarWithTags">
270-
<echo message="Javac out = ${basedir}"/>
271303
<jar destfile="${lib.out}/${jar.name}.jar"
272304
basedir="${base}">
273305
<manifest>

scripts/runperftest.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
run() {
44
echo "=== running with '$2'"
5-
sh ./runjava.sh com.rabbitmq.examples.MulticastMain -h $1 -z 10 -i 20 $2
5+
sh `dirname $0`/runjava.sh com.rabbitmq.examples.MulticastMain -h $1 -z 10 -i 20 $2
66
sleep 2
77
}
88

src/com/rabbitmq/client/Channel.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,9 +70,18 @@ public interface Channel extends ShutdownNotifier{
7070
Connection getConnection();
7171

7272
/**
73-
* Close this channel with the given code and message
73+
* Close this channel with the {@link com.rabbitmq.client.AMQP#REPLY_SUCCESS} close code
74+
* and message 'OK'.
75+
*
76+
* @throws java.io.IOException if an error is encountered
77+
*/
78+
void close() throws IOException;
79+
80+
/**
81+
* Close this channel.
82+
*
7483
* @param closeCode the close code (See under "Reply Codes" in the AMQP specification)
75-
* @param closeMessage a message indicating the reason for closing the channel
84+
* @param closeMessage a message indicating the reason for closing the connection
7685
* @throws java.io.IOException if an error is encountered
7786
*/
7887
void close(int closeCode, String closeMessage) throws IOException;

src/com/rabbitmq/client/Connection.java

Lines changed: 75 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -121,44 +121,103 @@ public interface Connection extends ShutdownNotifier { // rename to AMQPConnecti
121121
Channel createChannel(int channelNumber) throws IOException;
122122

123123
/**
124-
* Close this connection and all its channels.
124+
* Close this connection and all its channels
125+
* with the {@link com.rabbitmq.client.AMQP#REPLY_SUCCESS} close code
126+
* and message 'OK'.
125127
*
126-
* This method will wait infinitely for all the close operations to
127-
* complete.
128+
* Waits for all the close operations to complete.
128129
*
129130
* @throws IOException if an I/O problem is encountered
130131
*/
131132
void close() throws IOException;
133+
134+
/**
135+
* Close this connection and all its channels.
136+
*
137+
* Waits for all the close operations to complete.
138+
*
139+
* @param closeCode the close code (See under "Reply Codes" in the AMQP specification)
140+
* @param closeMessage a message indicating the reason for closing the connection
141+
* @throws IOException if an I/O problem is encountered
142+
*/
143+
void close(int closeCode, String closeMessage) throws IOException;
132144

133145
/**
134146
* Close this connection and all its channels
135-
*
136-
* This method will wait with the given timeout for all the close
137-
* operations to complete. If timeout is reached then socket is forced
138-
* to close
139-
* @param timeout timeout (in milioseconds) for completing all the close-related
147+
* with the {@link com.rabbitmq.client.AMQP#REPLY_SUCCESS} close code
148+
* and message 'OK'.
149+
*
150+
* This method behaves in a similar way as {@link #close()}, with the only difference
151+
* that it waits with a provided timeout for all the close operations to
152+
* complete. When timeout is reached the socket is forced to close.
153+
*
154+
* @param timeout timeout (in milliseconds) for completing all the close-related
140155
* operations, use -1 for infinity
141156
* @throws IOException if an I/O problem is encountered
142157
*/
143158
void close(int timeout) throws IOException;
159+
160+
/**
161+
* Close this connection and all its channels.
162+
*
163+
* Waits with the given timeout for all the close operations to complete.
164+
* When timeout is reached the socket is forced to close.
165+
*
166+
* @param closeCode the close code (See under "Reply Codes" in the AMQP specification)
167+
* @param closeMessage a message indicating the reason for closing the connection
168+
* @param timeout timeout (in milliseconds) for completing all the close-related
169+
* operations, use -1 for infinity
170+
* @throws IOException if an I/O problem is encountered
171+
*/
172+
void close(int closeCode, String closeMessage, int timeout) throws IOException;
144173

145174
/**
146-
* Abort this connection and all its channels.
175+
* Abort this connection and all its channels
176+
* with the {@link com.rabbitmq.client.AMQP#REPLY_SUCCESS} close code
177+
* and message 'OK'.
147178
*
148-
* This method will force the connection to close. It will silently discard
149-
* any exceptions enountered in close operations
179+
* Forces the connection to close.
180+
* Any encountered exceptions in the close operations are silently discarded.
150181
*/
151182
void abort();
152-
183+
153184
/**
154185
* Abort this connection and all its channels.
155186
*
156-
* This method behaves in a similar way as abort(), with the only difference
157-
* that it will wait with a provided timeout for all the close operations to
158-
* complete. If timeout is reached socket is forced to close.
187+
* Forces the connection to close and waits for all the close operations to complete.
188+
* Any encountered exceptions in the close operations are silently discarded.
189+
*
190+
* @param closeCode the close code (See under "Reply Codes" in the AMQP specification)
191+
* @param closeMessage a message indicating the reason for closing the connection
192+
*/
193+
void abort(int closeCode, String closeMessage);
194+
195+
/**
196+
* Abort this connection and all its channels
197+
* with the {@link com.rabbitmq.client.AMQP#REPLY_SUCCESS} close code
198+
* and message 'OK'.
199+
*
200+
* This method behaves in a similar way as {@link #abort()}, with the only difference
201+
* that it waits with a provided timeout for all the close operations to
202+
* complete. When timeout is reached the socket is forced to close.
159203
*
160-
* @param timeout timeout (in miliseconds) for completing all the close-related
204+
* @param timeout timeout (in milliseconds) for completing all the close-related
161205
* operations, use -1 for infinity
162206
*/
163207
void abort(int timeout);
208+
209+
/**
210+
* Abort this connection and all its channels.
211+
*
212+
* Forces the connection to close and waits with the given timeout
213+
* for all the close operations to complete. When timeout is reached
214+
* the socket is forced to close.
215+
* Any encountered exceptions in the close operations are silently discarded.
216+
*
217+
* @param closeCode the close code (See under "Reply Codes" in the AMQP specification)
218+
* @param closeMessage a message indicating the reason for closing the connection
219+
* @param timeout timeout (in milliseconds) for completing all the close-related
220+
* operations, use -1 for infinity
221+
*/
222+
void abort(int closeCode, String closeMessage, int timeout);
164223
}

src/com/rabbitmq/client/impl/AMQChannel.java

Lines changed: 42 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -155,13 +155,6 @@ public synchronized void enqueueRpc(RpcContinuation k)
155155
_activeRpc = k;
156156
}
157157

158-
public synchronized void transmitAndEnqueue(Method m, RpcContinuation k)
159-
throws IOException
160-
{
161-
enqueueRpc(k);
162-
transmit(m);
163-
}
164-
165158
public synchronized RpcContinuation nextOutstandingRpc()
166159
{
167160
RpcContinuation result = _activeRpc;
@@ -200,20 +193,14 @@ public synchronized void rpc(Method m, RpcContinuation k)
200193
throws IOException
201194
{
202195
ensureIsOpen();
203-
transmitAndEnqueue(m, k);
196+
quiescingRpc(m, k);
204197
}
205-
206-
/**
207-
* Just like rpc(Method), but for use during quiescing/close/shutdown.
208-
* Not for regular use. Doesn't do the ensureIsOpen() check.
209-
*/
210-
public AMQCommand quiescingRpc(Method m,
211-
int timeoutMillisec)
212-
throws IOException, ShutdownSignalException, TimeoutException
198+
199+
public synchronized void quiescingRpc(Method m, RpcContinuation k)
200+
throws IOException
213201
{
214-
SimpleBlockingRpcContinuation k = new SimpleBlockingRpcContinuation();
215-
transmitAndEnqueue(m, k);
216-
return k.getReply(timeoutMillisec);
202+
enqueueRpc(k);
203+
quiescingTransmit(m);
217204
}
218205

219206
/**
@@ -228,26 +215,56 @@ public AMQCommand quiescingRpc(Method m,
228215
@Override public String toString() {
229216
return "AMQChannel(" + _connection + "," + _channelNumber + ")";
230217
}
231-
218+
232219
/**
233220
* Protected API - respond, in the driver thread, to a {@link ShutdownSignalException}.
234221
* @param signal the signal to handle
222+
* @param ignoreClosed the flag indicating whether to ignore the AlreadyClosedException
223+
* thrown when the channel is already closed
224+
* @param notifyRpc the flag indicating whether any remaining rpc continuation should be
225+
* notified with the given signal
235226
*/
236-
public void processShutdownSignal(ShutdownSignalException signal) {
237-
synchronized (this) {
238-
ensureIsOpen(); // invariant: we should never be shut down more than once per instance
239-
_shutdownCause = signal;
227+
public void processShutdownSignal(ShutdownSignalException signal,
228+
boolean ignoreClosed,
229+
boolean notifyRpc) {
230+
try {
231+
synchronized (this) {
232+
if (!ignoreClosed)
233+
ensureIsOpen(); // invariant: we should never be shut down more than once per instance
234+
if (isOpen())
235+
_shutdownCause = signal;
236+
}
237+
} finally {
238+
if (notifyRpc)
239+
notifyOutstandingRpc(signal);
240240
}
241+
}
242+
243+
public void notifyOutstandingRpc(ShutdownSignalException signal) {
241244
RpcContinuation k = nextOutstandingRpc();
242245
if (k != null) {
243246
k.handleShutdownSignal(signal);
244247
}
245248
}
246249

247-
public void transmit(Method m) throws IOException {
250+
public synchronized void transmit(Method m) throws IOException {
251+
ensureIsOpen();
252+
quiescingTransmit(m);
253+
}
254+
255+
public synchronized void transmit(AMQCommand c) throws IOException {
256+
ensureIsOpen();
257+
quiescingTransmit(c);
258+
}
259+
260+
public synchronized void quiescingTransmit(Method m) throws IOException {
248261
new AMQCommand(m).transmit(this);
249262
}
250263

264+
public synchronized void quiescingTransmit(AMQCommand c) throws IOException {
265+
c.transmit(this);
266+
}
267+
251268
public AMQConnection getAMQConnection() {
252269
return _connection;
253270
}

0 commit comments

Comments
 (0)