Skip to content

Commit 774e215

Browse files
author
Hubert Plociniczak
committed
Merged default into bug19497
2 parents 6e1ce0b + 07d9bd1 commit 774e215

18 files changed

+313
-147
lines changed

build.xml

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
<?xml version="1.0"?>
2-
<project default="build">
2+
<project name="RabbitMQ Java client" default="build">
33

44
<property file="build.properties"/>
55
<property file="config.properties"/>
@@ -44,17 +44,37 @@
4444
<target name="amqp-generate" depends="amqp-generate-check"
4545
unless="amqp.generate.notRequired" description="generate AMQP.java and AMQImpl.java from AMQP spec">
4646
<mkdir dir="${src.generated}/com/rabbitmq/client/"/>
47-
<exec dir="." executable="python" output="${src.generated}/com/rabbitmq/client/AMQP.java">
47+
<exec dir="." executable="python"
48+
output="${src.generated}/com/rabbitmq/client/AMQP.java"
49+
errorproperty="amqp.generate.error1"
50+
resultproperty="amqp.generate.result1">
4851
<arg line="codegen.py"/>
4952
<arg line="header"/>
5053
<arg line="${AMQP_SPEC_JSON_PATH}"/>
5154
</exec>
55+
<fail message="Generation of AMQP.java failed with message:${line.separator}${amqp.generate.error1}">
56+
<condition>
57+
<not>
58+
<equals arg1="${amqp.generate.result1}" arg2="0" />
59+
</not>
60+
</condition>
61+
</fail>
5262
<mkdir dir="${src.generated}/com/rabbitmq/client/impl"/>
53-
<exec dir="." executable="python" output="${src.generated}/com/rabbitmq/client/impl/AMQImpl.java">
63+
<exec dir="." executable="python"
64+
output="${src.generated}/com/rabbitmq/client/impl/AMQImpl.java"
65+
errorproperty="amqp.generate.error2"
66+
resultproperty="amqp.generate.result2">
5467
<arg line="codegen.py"/>
5568
<arg line="body"/>
5669
<arg line="${AMQP_SPEC_JSON_PATH}"/>
5770
</exec>
71+
<fail message="Generation of AMQPImpl.java failed with message:${line.separator}${amqp.generate.error2}">
72+
<condition>
73+
<not>
74+
<equals arg1="${amqp.generate.result2}" arg2="0" />
75+
</not>
76+
</condition>
77+
</fail>
5878
</target>
5979

6080
<target name="build" depends="amqp-generate">

src/com/rabbitmq/client/Channel.java

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,9 +70,18 @@ public interface Channel extends ShutdownNotifier{
7070
Connection getConnection();
7171

7272
/**
73-
* Close this channel with the given code and message
73+
* Close this channel with the {@link com.rabbitmq.client.AMQP#REPLY_SUCCESS} close code
74+
* and message 'OK'.
75+
*
76+
* @throws java.io.IOException if an error is encountered
77+
*/
78+
void close() throws IOException;
79+
80+
/**
81+
* Close this channel.
82+
*
7483
* @param closeCode the close code (See under "Reply Codes" in the AMQP specification)
75-
* @param closeMessage a message indicating the reason for closing the channel
84+
* @param closeMessage a message indicating the reason for closing the connection
7685
* @throws java.io.IOException if an error is encountered
7786
*/
7887
void close(int closeCode, String closeMessage) throws IOException;
@@ -296,6 +305,19 @@ Queue.DeclareOk queueDeclare(int ticket, String queue, boolean passive, boolean
296305
*/
297306
Queue.BindOk queueBind(int ticket, String queue, String exchange, String routingKey) throws IOException;
298307

308+
/**
309+
* Uninds a queue from an exchange, with no extra arguments.
310+
* @see com.rabbitmq.client.AMQP.Queue.Unbind
311+
* @see com.rabbitmq.client.AMQP.Queue.UnbindOk
312+
* @param ticket an access ticket for the appropriate realm
313+
* @param queue the name of the queue
314+
* @param exchange the name of the exchange
315+
* @param routingKey the routine key to use for the binding
316+
* @return an unbinding-confirm method if the binding was successfully deleted
317+
* @throws java.io.IOException if an error is encountered
318+
*/
319+
Queue.UnbindOk queueUnbind(int ticket, String queue, String exchange, String routingKey) throws IOException;
320+
299321
/**
300322
* Bind a queue to an exchange.
301323
* @see com.rabbitmq.client.AMQP.Queue.Bind
@@ -310,6 +332,20 @@ Queue.DeclareOk queueDeclare(int ticket, String queue, boolean passive, boolean
310332
*/
311333
Queue.BindOk queueBind(int ticket, String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;
312334

335+
/**
336+
* Unbind a queue from an exchange.
337+
* @see com.rabbitmq.client.AMQP.Queue.Unbind
338+
* @see com.rabbitmq.client.AMQP.Queue.UnbindOk
339+
* @param ticket an access ticket for the appropriate realm
340+
* @param queue the name of the queue
341+
* @param exchange the name of the exchange
342+
* @param routingKey the routine key to use for the binding
343+
* @param arguments other properties (binding parameters)
344+
* @return an unbinding-confirm method if the binding was successfully deleted
345+
* @throws java.io.IOException if an error is encountered
346+
*/
347+
Queue.UnbindOk queueUnbind(int ticket, String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;
348+
313349
/**
314350
* Retrieve a message from a queue using {@link com.rabbitmq.client.AMQP.Basic.Get}
315351
* @see com.rabbitmq.client.AMQP.Basic.Get

src/com/rabbitmq/client/Connection.java

Lines changed: 75 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -121,44 +121,103 @@ public interface Connection extends ShutdownNotifier { // rename to AMQPConnecti
121121
Channel createChannel(int channelNumber) throws IOException;
122122

123123
/**
124-
* Close this connection and all its channels.
124+
* Close this connection and all its channels
125+
* with the {@link com.rabbitmq.client.AMQP#REPLY_SUCCESS} close code
126+
* and message 'OK'.
125127
*
126-
* This method will wait infinitely for all the close operations to
127-
* complete.
128+
* Waits for all the close operations to complete.
128129
*
129130
* @throws IOException if an I/O problem is encountered
130131
*/
131132
void close() throws IOException;
133+
134+
/**
135+
* Close this connection and all its channels.
136+
*
137+
* Waits for all the close operations to complete.
138+
*
139+
* @param closeCode the close code (See under "Reply Codes" in the AMQP specification)
140+
* @param closeMessage a message indicating the reason for closing the connection
141+
* @throws IOException if an I/O problem is encountered
142+
*/
143+
void close(int closeCode, String closeMessage) throws IOException;
132144

133145
/**
134146
* Close this connection and all its channels
135-
*
136-
* This method will wait with the given timeout for all the close
137-
* operations to complete. If timeout is reached then socket is forced
138-
* to close
139-
* @param timeout timeout (in milioseconds) for completing all the close-related
147+
* with the {@link com.rabbitmq.client.AMQP#REPLY_SUCCESS} close code
148+
* and message 'OK'.
149+
*
150+
* This method behaves in a similar way as {@link #close()}, with the only difference
151+
* that it waits with a provided timeout for all the close operations to
152+
* complete. When timeout is reached the socket is forced to close.
153+
*
154+
* @param timeout timeout (in milliseconds) for completing all the close-related
140155
* operations, use -1 for infinity
141156
* @throws IOException if an I/O problem is encountered
142157
*/
143158
void close(int timeout) throws IOException;
159+
160+
/**
161+
* Close this connection and all its channels.
162+
*
163+
* Waits with the given timeout for all the close operations to complete.
164+
* When timeout is reached the socket is forced to close.
165+
*
166+
* @param closeCode the close code (See under "Reply Codes" in the AMQP specification)
167+
* @param closeMessage a message indicating the reason for closing the connection
168+
* @param timeout timeout (in milliseconds) for completing all the close-related
169+
* operations, use -1 for infinity
170+
* @throws IOException if an I/O problem is encountered
171+
*/
172+
void close(int closeCode, String closeMessage, int timeout) throws IOException;
144173

145174
/**
146-
* Abort this connection and all its channels.
175+
* Abort this connection and all its channels
176+
* with the {@link com.rabbitmq.client.AMQP#REPLY_SUCCESS} close code
177+
* and message 'OK'.
147178
*
148-
* This method will force the connection to close. It will silently discard
149-
* any exceptions enountered in close operations
179+
* Forces the connection to close.
180+
* Any encountered exceptions in the close operations are silently discarded.
150181
*/
151182
void abort();
152-
183+
153184
/**
154185
* Abort this connection and all its channels.
155186
*
156-
* This method behaves in a similar way as abort(), with the only difference
157-
* that it will wait with a provided timeout for all the close operations to
158-
* complete. If timeout is reached socket is forced to close.
187+
* Forces the connection to close and waits for all the close operations to complete.
188+
* Any encountered exceptions in the close operations are silently discarded.
189+
*
190+
* @param closeCode the close code (See under "Reply Codes" in the AMQP specification)
191+
* @param closeMessage a message indicating the reason for closing the connection
192+
*/
193+
void abort(int closeCode, String closeMessage);
194+
195+
/**
196+
* Abort this connection and all its channels
197+
* with the {@link com.rabbitmq.client.AMQP#REPLY_SUCCESS} close code
198+
* and message 'OK'.
199+
*
200+
* This method behaves in a similar way as {@link #abort()}, with the only difference
201+
* that it waits with a provided timeout for all the close operations to
202+
* complete. When timeout is reached the socket is forced to close.
159203
*
160-
* @param timeout timeout (in miliseconds) for completing all the close-related
204+
* @param timeout timeout (in milliseconds) for completing all the close-related
161205
* operations, use -1 for infinity
162206
*/
163207
void abort(int timeout);
208+
209+
/**
210+
* Abort this connection and all its channels.
211+
*
212+
* Forces the connection to close and waits with the given timeout
213+
* for all the close operations to complete. When timeout is reached
214+
* the socket is forced to close.
215+
* Any encountered exceptions in the close operations are silently discarded.
216+
*
217+
* @param closeCode the close code (See under "Reply Codes" in the AMQP specification)
218+
* @param closeMessage a message indicating the reason for closing the connection
219+
* @param timeout timeout (in milliseconds) for completing all the close-related
220+
* operations, use -1 for infinity
221+
*/
222+
void abort(int closeCode, String closeMessage, int timeout);
164223
}

src/com/rabbitmq/client/impl/AMQChannel.java

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -215,17 +215,29 @@ public synchronized void quiescingRpc(Method m, RpcContinuation k)
215215
@Override public String toString() {
216216
return "AMQChannel(" + _connection + "," + _channelNumber + ")";
217217
}
218-
218+
219219
/**
220220
* Protected API - respond, in the driver thread, to a {@link ShutdownSignalException}.
221221
* @param signal the signal to handle
222+
* @param ignoreClosed the flag indicating whether to ignore the AlreadyClosedException
223+
* thrown when the channel is already closed
224+
* @param notifyRpc the flag indicating whether any remaining rpc continuation should be
225+
* notified with the given signal
222226
*/
223-
public void processShutdownSignal(ShutdownSignalException signal) {
224-
synchronized (this) {
225-
ensureIsOpen(); // invariant: we should never be shut down more than once per instance
226-
_shutdownCause = signal;
227+
public void processShutdownSignal(ShutdownSignalException signal,
228+
boolean ignoreClosed,
229+
boolean notifyRpc) {
230+
try {
231+
synchronized (this) {
232+
if (!ignoreClosed)
233+
ensureIsOpen(); // invariant: we should never be shut down more than once per instance
234+
if (isOpen())
235+
_shutdownCause = signal;
236+
}
237+
} finally {
238+
if (notifyRpc)
239+
notifyOutstandingRpc(signal);
227240
}
228-
notifyOutstandingRpc(signal);
229241
}
230242

231243
public void notifyOutstandingRpc(ShutdownSignalException signal) {

0 commit comments

Comments
 (0)