Skip to content

Commit a85d78c

Browse files
author
Hubert Plociniczak
committed
Merged default into bug19559
2 parents 35e8aed + c4f0019 commit a85d78c

19 files changed

+324
-152
lines changed

build.xml

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,17 +44,37 @@
4444
<target name="amqp-generate" depends="amqp-generate-check"
4545
unless="amqp.generate.notRequired" description="generate AMQP.java and AMQImpl.java from AMQP spec">
4646
<mkdir dir="${src.generated}/com/rabbitmq/client/"/>
47-
<exec dir="." executable="python" output="${src.generated}/com/rabbitmq/client/AMQP.java">
47+
<exec dir="." executable="python"
48+
output="${src.generated}/com/rabbitmq/client/AMQP.java"
49+
errorproperty="amqp.generate.error1"
50+
resultproperty="amqp.generate.result1">
4851
<arg line="codegen.py"/>
4952
<arg line="header"/>
5053
<arg line="${AMQP_SPEC_JSON_PATH}"/>
5154
</exec>
55+
<fail message="Generation of AMQP.java failed with message:${line.separator}${amqp.generate.error1}">
56+
<condition>
57+
<not>
58+
<equals arg1="${amqp.generate.result1}" arg2="0" />
59+
</not>
60+
</condition>
61+
</fail>
5262
<mkdir dir="${src.generated}/com/rabbitmq/client/impl"/>
53-
<exec dir="." executable="python" output="${src.generated}/com/rabbitmq/client/impl/AMQImpl.java">
63+
<exec dir="." executable="python"
64+
output="${src.generated}/com/rabbitmq/client/impl/AMQImpl.java"
65+
errorproperty="amqp.generate.error2"
66+
resultproperty="amqp.generate.result2">
5467
<arg line="codegen.py"/>
5568
<arg line="body"/>
5669
<arg line="${AMQP_SPEC_JSON_PATH}"/>
5770
</exec>
71+
<fail message="Generation of AMQPImpl.java failed with message:${line.separator}${amqp.generate.error2}">
72+
<condition>
73+
<not>
74+
<equals arg1="${amqp.generate.result2}" arg2="0" />
75+
</not>
76+
</condition>
77+
</fail>
5878
</target>
5979

6080
<target name="build" depends="amqp-generate">

src/com/rabbitmq/client/Channel.java

Lines changed: 51 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -70,9 +70,18 @@ public interface Channel extends ShutdownNotifier{
7070
Connection getConnection();
7171

7272
/**
73-
* Close this channel with the given code and message
73+
* Close this channel with the {@link com.rabbitmq.client.AMQP#REPLY_SUCCESS} close code
74+
* and message 'OK'.
75+
*
76+
* @throws java.io.IOException if an error is encountered
77+
*/
78+
void close() throws IOException;
79+
80+
/**
81+
* Close this channel.
82+
*
7483
* @param closeCode the close code (See under "Reply Codes" in the AMQP specification)
75-
* @param closeMessage a message indicating the reason for closing the channel
84+
* @param closeMessage a message indicating the reason for closing the connection
7685
* @throws java.io.IOException if an error is encountered
7786
*/
7887
void close(int closeCode, String closeMessage) throws IOException;
@@ -189,19 +198,6 @@ void basicPublish(int ticket, String exchange, String routingKey, boolean mandat
189198
*/
190199
Exchange.DeclareOk exchangeDeclare(int ticket, String exchange, String type, boolean durable) throws IOException;
191200

192-
/**
193-
* Actively declare a non-exclusive, non-autodelete queue
194-
* The name of the new queue is held in the "queue" field of the {@link com.rabbitmq.client.AMQP.Queue.DeclareOk} result.
195-
* @see com.rabbitmq.client.AMQP.Queue.Declare
196-
* @see com.rabbitmq.client.AMQP.Queue.DeclareOk
197-
* @param ticket an access ticket for the appropriate realm
198-
* @param queue the name of the queue
199-
* @param durable true if we are declaring a durable exchange (the exchange will survive a server restart)
200-
* @return a declaration-confirm method to indicate the exchange was successfully declared
201-
* @throws java.io.IOException if an error is encountered
202-
*/
203-
Queue.DeclareOk queueDeclare(int ticket, String queue, boolean durable) throws IOException;
204-
205201
/**
206202
* Declare an exchange, via an interface that allows the complete set of arguments
207203
* The name of the new queue is held in the "queue" field of the {@link com.rabbitmq.client.AMQP.Queue.DeclareOk} result.
@@ -241,6 +237,19 @@ Exchange.DeclareOk exchangeDeclare(int ticket, String exchange, String type, boo
241237
* @throws java.io.IOException if an error is encountered
242238
*/
243239
Queue.DeclareOk queueDeclare(int ticket, String queue) throws IOException;
240+
241+
/**
242+
* Actively declare a non-exclusive, non-autodelete queue
243+
* The name of the new queue is held in the "queue" field of the {@link com.rabbitmq.client.AMQP.Queue.DeclareOk} result.
244+
* @see com.rabbitmq.client.AMQP.Queue.Declare
245+
* @see com.rabbitmq.client.AMQP.Queue.DeclareOk
246+
* @param ticket an access ticket for the appropriate realm
247+
* @param queue the name of the queue
248+
* @param durable true if we are declaring a durable exchange (the exchange will survive a server restart)
249+
* @return a declaration-confirm method to indicate the exchange was successfully declared
250+
* @throws java.io.IOException if an error is encountered
251+
*/
252+
Queue.DeclareOk queueDeclare(int ticket, String queue, boolean durable) throws IOException;
244253

245254
/**
246255
* Declare a queue
@@ -309,6 +318,33 @@ Queue.DeclareOk queueDeclare(int ticket, String queue, boolean passive, boolean
309318
* @throws java.io.IOException if an error is encountered
310319
*/
311320
Queue.BindOk queueBind(int ticket, String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;
321+
322+
/**
323+
* Uninds a queue from an exchange, with no extra arguments.
324+
* @see com.rabbitmq.client.AMQP.Queue.Unbind
325+
* @see com.rabbitmq.client.AMQP.Queue.UnbindOk
326+
* @param ticket an access ticket for the appropriate realm
327+
* @param queue the name of the queue
328+
* @param exchange the name of the exchange
329+
* @param routingKey the routine key to use for the binding
330+
* @return an unbinding-confirm method if the binding was successfully deleted
331+
* @throws java.io.IOException if an error is encountered
332+
*/
333+
Queue.UnbindOk queueUnbind(int ticket, String queue, String exchange, String routingKey) throws IOException;
334+
335+
/**
336+
* Unbind a queue from an exchange.
337+
* @see com.rabbitmq.client.AMQP.Queue.Unbind
338+
* @see com.rabbitmq.client.AMQP.Queue.UnbindOk
339+
* @param ticket an access ticket for the appropriate realm
340+
* @param queue the name of the queue
341+
* @param exchange the name of the exchange
342+
* @param routingKey the routine key to use for the binding
343+
* @param arguments other properties (binding parameters)
344+
* @return an unbinding-confirm method if the binding was successfully deleted
345+
* @throws java.io.IOException if an error is encountered
346+
*/
347+
Queue.UnbindOk queueUnbind(int ticket, String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;
312348

313349
/**
314350
* Retrieve a message from a queue using {@link com.rabbitmq.client.AMQP.Basic.Get}

src/com/rabbitmq/client/Connection.java

Lines changed: 75 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -121,44 +121,103 @@ public interface Connection extends ShutdownNotifier { // rename to AMQPConnecti
121121
Channel createChannel(int channelNumber) throws IOException;
122122

123123
/**
124-
* Close this connection and all its channels.
124+
* Close this connection and all its channels
125+
* with the {@link com.rabbitmq.client.AMQP#REPLY_SUCCESS} close code
126+
* and message 'OK'.
125127
*
126-
* This method will wait infinitely for all the close operations to
127-
* complete.
128+
* Waits for all the close operations to complete.
128129
*
129130
* @throws IOException if an I/O problem is encountered
130131
*/
131132
void close() throws IOException;
133+
134+
/**
135+
* Close this connection and all its channels.
136+
*
137+
* Waits for all the close operations to complete.
138+
*
139+
* @param closeCode the close code (See under "Reply Codes" in the AMQP specification)
140+
* @param closeMessage a message indicating the reason for closing the connection
141+
* @throws IOException if an I/O problem is encountered
142+
*/
143+
void close(int closeCode, String closeMessage) throws IOException;
132144

133145
/**
134146
* Close this connection and all its channels
135-
*
136-
* This method will wait with the given timeout for all the close
137-
* operations to complete. If timeout is reached then socket is forced
138-
* to close
139-
* @param timeout timeout (in milioseconds) for completing all the close-related
147+
* with the {@link com.rabbitmq.client.AMQP#REPLY_SUCCESS} close code
148+
* and message 'OK'.
149+
*
150+
* This method behaves in a similar way as {@link #close()}, with the only difference
151+
* that it waits with a provided timeout for all the close operations to
152+
* complete. When timeout is reached the socket is forced to close.
153+
*
154+
* @param timeout timeout (in milliseconds) for completing all the close-related
140155
* operations, use -1 for infinity
141156
* @throws IOException if an I/O problem is encountered
142157
*/
143158
void close(int timeout) throws IOException;
159+
160+
/**
161+
* Close this connection and all its channels.
162+
*
163+
* Waits with the given timeout for all the close operations to complete.
164+
* When timeout is reached the socket is forced to close.
165+
*
166+
* @param closeCode the close code (See under "Reply Codes" in the AMQP specification)
167+
* @param closeMessage a message indicating the reason for closing the connection
168+
* @param timeout timeout (in milliseconds) for completing all the close-related
169+
* operations, use -1 for infinity
170+
* @throws IOException if an I/O problem is encountered
171+
*/
172+
void close(int closeCode, String closeMessage, int timeout) throws IOException;
144173

145174
/**
146-
* Abort this connection and all its channels.
175+
* Abort this connection and all its channels
176+
* with the {@link com.rabbitmq.client.AMQP#REPLY_SUCCESS} close code
177+
* and message 'OK'.
147178
*
148-
* This method will force the connection to close. It will silently discard
149-
* any exceptions enountered in close operations
179+
* Forces the connection to close.
180+
* Any encountered exceptions in the close operations are silently discarded.
150181
*/
151182
void abort();
152-
183+
153184
/**
154185
* Abort this connection and all its channels.
155186
*
156-
* This method behaves in a similar way as abort(), with the only difference
157-
* that it will wait with a provided timeout for all the close operations to
158-
* complete. If timeout is reached socket is forced to close.
187+
* Forces the connection to close and waits for all the close operations to complete.
188+
* Any encountered exceptions in the close operations are silently discarded.
189+
*
190+
* @param closeCode the close code (See under "Reply Codes" in the AMQP specification)
191+
* @param closeMessage a message indicating the reason for closing the connection
192+
*/
193+
void abort(int closeCode, String closeMessage);
194+
195+
/**
196+
* Abort this connection and all its channels
197+
* with the {@link com.rabbitmq.client.AMQP#REPLY_SUCCESS} close code
198+
* and message 'OK'.
199+
*
200+
* This method behaves in a similar way as {@link #abort()}, with the only difference
201+
* that it waits with a provided timeout for all the close operations to
202+
* complete. When timeout is reached the socket is forced to close.
159203
*
160-
* @param timeout timeout (in miliseconds) for completing all the close-related
204+
* @param timeout timeout (in milliseconds) for completing all the close-related
161205
* operations, use -1 for infinity
162206
*/
163207
void abort(int timeout);
208+
209+
/**
210+
* Abort this connection and all its channels.
211+
*
212+
* Forces the connection to close and waits with the given timeout
213+
* for all the close operations to complete. When timeout is reached
214+
* the socket is forced to close.
215+
* Any encountered exceptions in the close operations are silently discarded.
216+
*
217+
* @param closeCode the close code (See under "Reply Codes" in the AMQP specification)
218+
* @param closeMessage a message indicating the reason for closing the connection
219+
* @param timeout timeout (in milliseconds) for completing all the close-related
220+
* operations, use -1 for infinity
221+
*/
222+
void abort(int closeCode, String closeMessage, int timeout);
164223
}

src/com/rabbitmq/client/impl/AMQChannel.java

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -219,17 +219,29 @@ public synchronized void quiescingRpc(Method m, RpcContinuation k)
219219
@Override public String toString() {
220220
return "AMQChannel(" + _connection + "," + _channelNumber + ")";
221221
}
222-
222+
223223
/**
224224
* Protected API - respond, in the driver thread, to a {@link ShutdownSignalException}.
225225
* @param signal the signal to handle
226+
* @param ignoreClosed the flag indicating whether to ignore the AlreadyClosedException
227+
* thrown when the channel is already closed
228+
* @param notifyRpc the flag indicating whether any remaining rpc continuation should be
229+
* notified with the given signal
226230
*/
227-
public void processShutdownSignal(ShutdownSignalException signal) {
228-
synchronized (this) {
229-
ensureIsOpen(); // invariant: we should never be shut down more than once per instance
230-
_shutdownCause = signal;
231+
public void processShutdownSignal(ShutdownSignalException signal,
232+
boolean ignoreClosed,
233+
boolean notifyRpc) {
234+
try {
235+
synchronized (this) {
236+
if (!ignoreClosed)
237+
ensureIsOpen(); // invariant: we should never be shut down more than once per instance
238+
if (isOpen())
239+
_shutdownCause = signal;
240+
}
241+
} finally {
242+
if (notifyRpc)
243+
notifyOutstandingRpc(signal);
231244
}
232-
notifyOutstandingRpc(signal);
233245
}
234246

235247
public void notifyOutstandingRpc(ShutdownSignalException signal) {

0 commit comments

Comments
 (0)