Skip to content

Commit c3a31af

Browse files
author
Alexandru Scvortov
committed
merge default into bug21569
2 parents bad0546 + 66a4a5e commit c3a31af

File tree

71 files changed

+2063
-400
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

71 files changed

+2063
-400
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ SRC_ARCHIVE=$(PACKAGE_NAME)-$(VERSION)
77
SIGNING_KEY=056E8E56
88
GNUPG_PATH=~
99

10-
WEB_URL=http://stage.rabbitmq.com/
10+
WEB_URL=http://www.rabbitmq.com/
1111
NEXUS_STAGE_URL=http://oss.sonatype.org/service/local/staging/deploy/maven2
1212

1313
AMQP_CODEGEN_DIR=$(shell fgrep sibling.codegen.dir build.properties | sed -e 's:sibling\.codegen\.dir=::')

build.xml

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
<pathelement path="${test.javac.out}"/>
2727
</path>
2828

29-
<property name="AMQP_SPEC_JSON_PATH" value="${codegen.dir}/amqp-${spec.version}.json"/>
29+
<property name="AMQP_SPEC_JSON_PATH" value="${codegen.dir}/amqp-${spec.version}.json ${codegen.dir}/rabbitmq-0.8-extensions.json"/>
3030

3131
<target name="amqp-generate-check" description="check if codegen needs to be run">
3232
<uptodate property="amqp.generate.notRequired">
@@ -392,6 +392,19 @@
392392
</junit>
393393
</target>
394394

395+
<target name="test-single" depends="test-build">
396+
<junit printSummary="withOutAndErr"
397+
haltOnFailure="${haltOnFailureJunit}"
398+
failureproperty="test.failure"
399+
fork="yes">
400+
<classpath refid="test.classpath"/>
401+
402+
<formatter type="plain"/>
403+
<formatter type="xml"/>
404+
<test todir="${build.out}" name="${test}"/>
405+
</junit>
406+
</target>
407+
395408
<target name="jar" depends="build">
396409
<mkdir dir="${lib.out}"/>
397410
<antcall target="doJarWithTags">

nexus-upload.sh

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -9,19 +9,20 @@ set -e
99
# GNUPG_PATH -- the path to the home directory for gnupg
1010

1111
NEXUS_ROOT="http://$CREDS@oss.sonatype.org/service/local/staging/deploy/maven2/com/rabbitmq/amqp-client/$VERSION"
12+
unset http_proxy
13+
unset https_proxy
14+
unset no_proxy
1215

13-
for ARTIFACT_NAME in $@; do
14-
echo "Uploading $ARTIFACT_NAME"
16+
for artifact in $@; do
17+
echo "Uploading $artifact"
1518

16-
rm -f $ARTIFACT_NAME.asc
17-
gpg --homedir $GNUPG_PATH/.gnupg --local-user $SIGNING_KEY --no-tty --armor --detach-sign --output $ARTIFACT_NAME.asc $ARTIFACT_NAME
18-
md5sum $ARTIFACT_NAME | cut -f1 -d' ' >$ARTIFACT_NAME.md5
19-
md5sum $ARTIFACT_NAME.asc | cut -f1 -d' ' >$ARTIFACT_NAME.asc.md5
20-
sha1sum $ARTIFACT_NAME | cut -f1 -d' ' >$ARTIFACT_NAME.sha1
21-
sha1sum $ARTIFACT_NAME.asc | cut -f1 -d' ' >$ARTIFACT_NAME.asc.sha1
22-
curl -XPUT --data-binary @$ARTIFACT_NAME $NEXUS_ROOT/$ARTIFACT_NAME
23-
24-
for EXT in md5 sha1 asc asc.md5 asc.sha1; do
25-
curl -XPUT --data-binary @$ARTIFACT_NAME.$EXT $NEXUS_ROOT/$ARTIFACT_NAME.$EXT
19+
rm -f $artifact.asc
20+
gpg --homedir $GNUPG_PATH/.gnupg --local-user $SIGNING_KEY --no-tty --armor --detach-sign --output $artifact.asc $artifact
21+
for ext in '' .asc ; do
22+
curl -XPUT --data-binary @$artifact$ext $NEXUS_ROOT/$artifact$ext
23+
for sum in md5 sha1 ; do
24+
${sum}sum $artifact$ext | (read a rest ; echo -n "$a") >$artifact$ext.$sum
25+
curl -XPUT --data-binary @$artifact$ext.$sum $NEXUS_ROOT/$artifact$ext.$sum
26+
done
2627
done
2728
done

src/com/rabbitmq/client/Channel.java

Lines changed: 115 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@
3737
import com.rabbitmq.client.AMQP.Exchange;
3838
import com.rabbitmq.client.AMQP.Queue;
3939
import com.rabbitmq.client.AMQP.Tx;
40+
import com.rabbitmq.client.AMQP.Basic;
41+
import com.rabbitmq.client.AMQP.Channel.FlowOk;
4042

4143
/**
4244
* Public API: Interface to an AMQ channel. See the <a href="http://www.amqp.org/">spec</a> for details.
@@ -91,6 +93,19 @@ public interface Channel extends ShutdownNotifier {
9193
*/
9294
void close(int closeCode, String closeMessage) throws IOException;
9395

96+
/**
97+
* Set flow on the channel
98+
*
99+
* @param active if true, the server is asked to start sending. If false, the server is asked to stop sending.
100+
* @throws IOException
101+
*/
102+
FlowOk flow(boolean active) throws IOException;
103+
104+
/**
105+
* Return the current Channel.Flow settings.
106+
*/
107+
FlowOk getFlow();
108+
94109
/**
95110
* Abort this channel with the {@link com.rabbitmq.client.AMQP#REPLY_SUCCESS} close code
96111
* and message 'OK'.
@@ -120,6 +135,52 @@ public interface Channel extends ShutdownNotifier {
120135
*/
121136
void setReturnListener(ReturnListener listener);
122137

138+
/**
139+
* Return the current {@link FlowListener}.
140+
* @return an interface to the current flow listener.
141+
*/
142+
FlowListener getFlowListener();
143+
144+
/**
145+
* Set the current {@link FlowListener}.
146+
* @param listener the listener to use, or null indicating "don't use one".
147+
*/
148+
void setFlowListener(FlowListener listener);
149+
150+
/**
151+
* Get the current default consumer. @see setDefaultConsumer for rationale.
152+
* @return an interface to the current default consumer.
153+
*/
154+
Consumer getDefaultConsumer();
155+
156+
/**
157+
* Set the current default consumer.
158+
*
159+
* Under certain circumstances it is possible for a channel to receive a
160+
* message delivery which does not match any consumer which is currently
161+
* set up via basicConsume(). This will occur after the following sequence
162+
* of events:
163+
*
164+
* ctag = basicConsume(queue, consumer); // i.e. with explicit acks
165+
* // some deliveries take place but are not acked
166+
* basicCancel(ctag);
167+
* basicRecover(false);
168+
*
169+
* Since requeue is specified to be false in the basicRecover, the spec
170+
* states that the message must be redelivered to "the original recipient"
171+
* - i.e. the same channel / consumer-tag. But the consumer is no longer
172+
* active.
173+
*
174+
* In these circumstances, you can register a default consumer to handle
175+
* such deliveries. If no default consumer is registered an
176+
* IllegalStateException will be thrown when such a delivery arrives.
177+
*
178+
* Most people will not need to use this.
179+
*
180+
* @param consumer the consumer to use, or null indicating "don't use one".
181+
*/
182+
void setDefaultConsumer(Consumer consumer);
183+
123184
/**
124185
* Request specific "quality of service" settings.
125186
*
@@ -174,38 +235,17 @@ public interface Channel extends ShutdownNotifier {
174235
void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body)
175236
throws IOException;
176237

177-
/**
178-
* Delete an exchange, without regard for whether it is in use or not
179-
* @see com.rabbitmq.client.AMQP.Exchange.Delete
180-
* @see com.rabbitmq.client.AMQP.Exchange.DeleteOk
181-
* @param exchange the name of the exchange
182-
* @return a deletion-confirm method to indicate the exchange was successfully deleted
183-
* @throws java.io.IOException if an error is encountered
184-
*/
185-
Exchange.DeleteOk exchangeDelete(String exchange) throws IOException;
186-
187238
/**
188239
* Actively declare a non-autodelete, non-durable exchange with no extra arguments
189240
* @see com.rabbitmq.client.AMQP.Exchange.Declare
190241
* @see com.rabbitmq.client.AMQP.Exchange.DeclareOk
191242
* @param exchange the name of the exchange
192243
* @param type the exchange type
193-
* @return a deletion-confirm method to indicate the exchange was successfully deleted
244+
* @return a declaration-confirm method to indicate the exchange was successfully declared
194245
* @throws java.io.IOException if an error is encountered
195246
*/
196247
Exchange.DeclareOk exchangeDeclare(String exchange, String type) throws IOException;
197248

198-
/**
199-
* Delete an exchange
200-
* @see com.rabbitmq.client.AMQP.Exchange.Delete
201-
* @see com.rabbitmq.client.AMQP.Exchange.DeleteOk
202-
* @param exchange the name of the exchange
203-
* @param ifUnused true to indicate that the exchange is only to be deleted if it is unused
204-
* @return a deletion-confirm method to indicate the exchange was successfully deleted
205-
* @throws java.io.IOException if an error is encountered
206-
*/
207-
Exchange.DeleteOk exchangeDelete(String exchange, boolean ifUnused) throws IOException;
208-
209249
/**
210250
* Actively declare a non-autodelete exchange with no extra arguments
211251
* @see com.rabbitmq.client.AMQP.Exchange.Declare
@@ -219,70 +259,87 @@ void basicPublish(String exchange, String routingKey, boolean mandatory, boolean
219259
Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable) throws IOException;
220260

221261
/**
222-
* Declare an exchange, via an interface that allows the complete set of arguments
223-
* The name of the new queue is held in the "queue" field of the {@link com.rabbitmq.client.AMQP.Queue.DeclareOk} result.
262+
* Declare an exchange, via an interface that allows the complete set of arguments.
224263
* @see com.rabbitmq.client.AMQP.Exchange.Declare
225264
* @see com.rabbitmq.client.AMQP.Exchange.DeclareOk
226265
* @param exchange the name of the exchange
227266
* @param type the exchange type
228-
* @param passive true if we are passively declaring a exchange (asserting the exchange already exists)
229267
* @param durable true if we are declaring a durable exchange (the exchange will survive a server restart)
230268
* @param autoDelete true if the server should delete the exchange when it is no longer in use
231269
* @param arguments other properties (construction arguments) for the exchange
232270
* @return a declaration-confirm method to indicate the exchange was successfully declared
233271
* @throws java.io.IOException if an error is encountered
234272
*/
235-
Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean passive, boolean durable, boolean autoDelete,
273+
Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete,
236274
Map<String, Object> arguments) throws IOException;
237275

238276
/**
239-
* Actively declare a server-named exclusive, autodelete, non-durable queue.
240-
* The name of the new queue is held in the "queue" field of the {@link com.rabbitmq.client.AMQP.Queue.DeclareOk} result.
241-
* @see com.rabbitmq.client.AMQP.Queue.Declare
242-
* @see com.rabbitmq.client.AMQP.Queue.DeclareOk
243-
* @return a declaration-confirm method to indicate the exchange was successfully declared
277+
* Declare an exchange passively; that is, check if the named exchange exists.
278+
* @param name check the existence of an exchange named this
279+
* @throws IOException the server will raise a 404 channel exception if the named exchange does not exist.
280+
*/
281+
Exchange.DeclareOk exchangeDeclarePassive(String name) throws IOException;
282+
283+
/**
284+
* Delete an exchange
285+
* @see com.rabbitmq.client.AMQP.Exchange.Delete
286+
* @see com.rabbitmq.client.AMQP.Exchange.DeleteOk
287+
* @param exchange the name of the exchange
288+
* @param ifUnused true to indicate that the exchange is only to be deleted if it is unused
289+
* @return a deletion-confirm method to indicate the exchange was successfully deleted
244290
* @throws java.io.IOException if an error is encountered
245291
*/
246-
Queue.DeclareOk queueDeclare() throws IOException;
292+
Exchange.DeleteOk exchangeDelete(String exchange, boolean ifUnused) throws IOException;
247293

248294
/**
249-
* Actively declare a non-exclusive, non-autodelete, non-durable queue
250-
* @see com.rabbitmq.client.AMQP.Queue.Declare
251-
* @see com.rabbitmq.client.AMQP.Queue.DeclareOk
252-
* @param queue the name of the queue
253-
* @return a declaration-confirm method to indicate the queue was successfully declared
295+
* Delete an exchange, without regard for whether it is in use or not
296+
* @see com.rabbitmq.client.AMQP.Exchange.Delete
297+
* @see com.rabbitmq.client.AMQP.Exchange.DeleteOk
298+
* @param exchange the name of the exchange
299+
* @return a deletion-confirm method to indicate the exchange was successfully deleted
254300
* @throws java.io.IOException if an error is encountered
255301
*/
256-
Queue.DeclareOk queueDeclare(String queue) throws IOException;
302+
Exchange.DeleteOk exchangeDelete(String exchange) throws IOException;
257303

258304
/**
259-
* Actively declare a non-exclusive, non-autodelete queue
305+
* Actively declare a server-named exclusive, autodelete, non-durable queue.
260306
* The name of the new queue is held in the "queue" field of the {@link com.rabbitmq.client.AMQP.Queue.DeclareOk} result.
261307
* @see com.rabbitmq.client.AMQP.Queue.Declare
262308
* @see com.rabbitmq.client.AMQP.Queue.DeclareOk
263-
* @param queue the name of the queue
264-
* @param durable true if we are declaring a durable exchange (the exchange will survive a server restart)
265-
* @return a declaration-confirm method to indicate the exchange was successfully declared
309+
* @return a declaration-confirm method to indicate the queue was successfully declared
266310
* @throws java.io.IOException if an error is encountered
267311
*/
268-
Queue.DeclareOk queueDeclare(String queue, boolean durable) throws IOException;
312+
Queue.DeclareOk queueDeclare() throws IOException;
269313

270314
/**
271315
* Declare a queue
272316
* @see com.rabbitmq.client.AMQP.Queue.Declare
273317
* @see com.rabbitmq.client.AMQP.Queue.DeclareOk
274318
* @param queue the name of the queue
275-
* @param passive true if we are passively declaring a queue (asserting the queue already exists)
276319
* @param durable true if we are declaring a durable queue (the queue will survive a server restart)
277-
* @param exclusive true if we are declaring an exclusive queue
320+
* @param exclusive true if we are declaring an exclusive queue (restricted to this connection)
278321
* @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use)
279322
* @param arguments other properties (construction arguments) for the queue
280323
* @return a declaration-confirm method to indicate the queue was successfully declared
281324
* @throws java.io.IOException if an error is encountered
282325
*/
283-
Queue.DeclareOk queueDeclare(String queue, boolean passive, boolean durable, boolean exclusive, boolean autoDelete,
326+
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
284327
Map<String, Object> arguments) throws IOException;
285328

329+
/**
330+
* Declare a queue passively; i.e., check if it exists. In AMQP
331+
* 0-9-1, all arguments aside from nowait are ignored; and sending
332+
* nowait makes this method a no-op, so we default it to false.
333+
* @see com.rabbitmq.client.AMQP.Queue.Declare
334+
* @see com.rabbitmq.client.AMQP.Queue.DeclareOk
335+
* @param queue the name of the queue
336+
* @return a declaration-confirm method to indicate the queue exists
337+
* @throws java.io.IOException if an error is encountered,
338+
* including if the queue does not exist and if the queue is
339+
* exclusively owned by another connection.
340+
*/
341+
Queue.DeclareOk queueDeclarePassive(String queue) throws IOException;
342+
286343
/**
287344
* Delete a queue, without regard for whether it is in use or has messages on it
288345
* @see com.rabbitmq.client.AMQP.Queue.Delete
@@ -472,12 +529,22 @@ Queue.DeclareOk queueDeclare(String queue, boolean passive, boolean durable, boo
472529
* Ask the broker to resend unacknowledged messages. In 0-8
473530
* basic.recover is asynchronous; in 0-9-1 it is synchronous, and
474531
* the new, deprecated method basic.recover_async is asynchronous.
475-
* To avoid this API changing, this is named for the latter, and
476-
* will be deprecated.
477532
* @param requeue If true, messages will be requeued and possibly
478533
* delivered to a different consumer. If false, messages will be
479534
* redelivered to the same consumer.
480535
*/
536+
Basic.RecoverOk basicRecover(boolean requeue) throws IOException;
537+
538+
/**
539+
* Ask the broker to resend unacknowledged messages. In 0-8
540+
* basic.recover is asynchronous; in 0-9-1 it is synchronous, and
541+
* the new, deprecated method basic.recover_async is asynchronous
542+
* and deprecated.
543+
* @param requeue If true, messages will be requeued and possibly
544+
* delivered to a different consumer. If false, messages will be
545+
* redelivered to the same consumer.
546+
*/
547+
@Deprecated
481548
void basicRecoverAsync(boolean requeue) throws IOException;
482549

483550
/**

src/com/rabbitmq/client/Connection.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
package com.rabbitmq.client;
3232

3333
import java.io.IOException;
34+
import java.util.Map;
3435

3536
/**
3637
* Public API: Interface to an AMQ connection. See the see the <a href="http://www.amqp.org/">spec</a> for details.
@@ -39,7 +40,8 @@
3940
*
4041
* <pre>
4142
* ConnectionFactory factory = new ConnectionFactory();
42-
* factory.setHostName(hostName);
43+
* factory.setHost(hostName);
44+
* factory.setPort(portNumber);
4345
* factory.setVirtualHost(virtualHost);
4446
* factory.setUsername(username);
4547
* factory.setPassword(password);
@@ -89,12 +91,25 @@ public interface Connection extends ShutdownNotifier { // rename to AMQPConnecti
8991
*/
9092
int getHeartbeat();
9193

94+
/**
95+
* Get a copy of the map of client properties sent to the server
96+
*
97+
* @return a copy of the map of client properties
98+
*/
99+
Map<String, Object> getClientProperties();
100+
92101
/**
93102
* Retrieve the known hosts.
94103
* @return an array of addresses for all hosts that came back in the initial {@link com.rabbitmq.client.AMQP.Connection.OpenOk} open-ok method
95104
*/
96105
Address[] getKnownHosts();
97106

107+
/**
108+
* Retrieve the server properties.
109+
* @return a map of the server properties. This typically includes the product name and version of the server.
110+
*/
111+
Map<String, Object> getServerProperties();
112+
98113
/**
99114
* Create a new channel, using an internally allocated channel number.
100115
* @return a new channel descriptor, or null if none is available

0 commit comments

Comments
 (0)