Skip to content

Commit 510c914

Browse files
author
Alexandru Scvortov
committed
pull from default
2 parents 70f43f7 + e7f9a91 commit 510c914

File tree

10 files changed

+152
-144
lines changed

10 files changed

+152
-144
lines changed

src/com/rabbitmq/client/Channel.java

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -425,26 +425,15 @@ Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, b
425425
*/
426426
Queue.UnbindOk queueUnbind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;
427427

428-
/**
429-
* Purges the contents of the given queue and awaits a completion.
430-
* @see com.rabbitmq.client.AMQP.Queue.Purge
431-
* @see com.rabbitmq.client.AMQP.Queue.PurgeOk
432-
* @param queue the name of the queue
433-
* @return a purge-confirm method if the purge was executed succesfully
434-
* @throws java.io.IOException if an error is encountered
435-
*/
436-
Queue.PurgeOk queuePurge(String queue) throws IOException;
437-
438428
/**
439429
* Purges the contents of the given queue.
440430
* @see com.rabbitmq.client.AMQP.Queue.Purge
441431
* @see com.rabbitmq.client.AMQP.Queue.PurgeOk
442432
* @param queue the name of the queue
443-
* @param nowait whether to await completion of the purge
444433
* @return a purge-confirm method if the purge was executed succesfully
445434
* @throws java.io.IOException if an error is encountered
446435
*/
447-
Queue.PurgeOk queuePurge(String queue, boolean nowait) throws IOException;
436+
Queue.PurgeOk queuePurge(String queue) throws IOException;
448437

449438
/**
450439
* Retrieve a message from a queue using {@link com.rabbitmq.client.AMQP.Basic.Get}
@@ -532,12 +521,13 @@ Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, b
532521
* @param noLocal flag set to true unless server local buffering is required
533522
* @param exclusive true if this is an exclusive consumer
534523
* @param callback an interface to the consumer object
524+
* @param arguments a set of arguments for the consume
535525
* @return the consumerTag associated with the new consumer
536526
* @throws java.io.IOException if an error is encountered
537527
* @see com.rabbitmq.client.AMQP.Basic.Consume
538528
* @see com.rabbitmq.client.AMQP.Basic.ConsumeOk
539529
*/
540-
String basicConsume(String queue, boolean noAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> filter, Consumer callback) throws IOException;
530+
String basicConsume(String queue, boolean noAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, Consumer callback) throws IOException;
541531

542532
/**
543533
* Cancel a consumer. Calls the consumer's {@link Consumer#handleCancelOk}

src/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -634,16 +634,9 @@ public Queue.UnbindOk queueUnbind(String queue, String exchange, String routingK
634634
/** Public API - {@inheritDoc} */
635635
public Queue.PurgeOk queuePurge(String queue)
636636
throws IOException
637-
{
638-
return queuePurge(queue, false);
639-
}
640-
641-
/** Public API - {@inheritDoc} */
642-
public Queue.PurgeOk queuePurge(String queue, boolean nowait)
643-
throws IOException
644637
{
645638
return (Queue.PurgeOk)
646-
exnWrappingRpc(new Queue.Purge(TICKET, queue, nowait)).getMethod();
639+
exnWrappingRpc(new Queue.Purge(TICKET, queue, false)).getMethod();
647640
}
648641

649642
/** Public API - {@inheritDoc} */
@@ -715,7 +708,7 @@ public String basicConsume(String queue, boolean noAck, String consumerTag,
715708

716709
/** Public API - {@inheritDoc} */
717710
public String basicConsume(String queue, boolean noAck, String consumerTag,
718-
boolean noLocal, boolean exclusive, Map<String, Object> filter,
711+
boolean noLocal, boolean exclusive, Map<String, Object> arguments,
719712
final Consumer callback)
720713
throws IOException
721714
{
@@ -740,7 +733,7 @@ public String transformReply(AMQCommand replyCommand) {
740733

741734
rpc(new Basic.Consume(TICKET, queue, consumerTag,
742735
noLocal, noAck, exclusive,
743-
false, filter),
736+
false, arguments),
744737
k);
745738

746739
try {

test/src/com/rabbitmq/client/test/functional/BindingLifecycle.java

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -137,8 +137,8 @@ public void testExchangeIfUnused() throws IOException {
137137
try {
138138
channel.exchangeDelete(binding.x, true);
139139
}
140-
catch (Exception e) {
141-
// do nothing, this is the correct behaviour
140+
catch (IOException e) {
141+
checkShutdownSignal(AMQP.PRECONDITION_FAILED, e);
142142
openChannel();
143143
deleteExchangeAndQueue(binding);
144144
return;
@@ -193,17 +193,7 @@ public void testExchangePassiveDeclare() throws IOException {
193193
fail("Passive declare of an unknown exchange should fail");
194194
}
195195
catch (IOException ioe) {
196-
Throwable t = ioe.getCause();
197-
String msg = "Passive declare of an unknown exchange should send a 404";
198-
assertTrue(msg, t instanceof ShutdownSignalException);
199-
Object r = ((ShutdownSignalException)t).getReason();
200-
assertTrue(msg, r instanceof Command);
201-
Method m = ((Command)r).getMethod();
202-
assertTrue(msg, m instanceof AMQP.Channel.Close);
203-
assertEquals(msg,
204-
AMQP.NOT_FOUND,
205-
((AMQP.Channel.Close)m).getReplyCode());
206-
return;
196+
checkShutdownSignal(AMQP.NOT_FOUND, ioe);
207197
}
208198
}
209199

test/src/com/rabbitmq/client/test/functional/BindingLifecycleBase.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131

3232
package com.rabbitmq.client.test.functional;
3333

34+
import com.rabbitmq.client.AMQP;
3435
import com.rabbitmq.client.GetResponse;
3536
import com.rabbitmq.client.QueueingConsumer;
3637
import java.io.IOException;
@@ -104,8 +105,8 @@ protected void doAutoDelete(boolean durable, int queues) throws IOException {
104105
channel.queueBind(binding.q, binding.x, binding.k);
105106
sendRoutable(binding);
106107
}
107-
catch (Exception e) {
108-
// do nothing, this is the correct behaviour
108+
catch (IOException e) {
109+
checkShutdownSignal(AMQP.NOT_FOUND, e);
109110
channel = null;
110111
return;
111112
}

test/src/com/rabbitmq/client/test/functional/QueueLease.java

Lines changed: 32 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public class QueueLease extends BrokerTestCase {
4646

4747
// Currently the expiration timer is very responsive but this may
4848
// very well change in the future, so tweak accordingly.
49-
private final static long QUEUE_EXPIRES = 1000L; // msecs
49+
private final static int QUEUE_EXPIRES = 1000; // msecs
5050
private final static int SHOULD_EXPIRE_WITHIN = 2000;
5151

5252
/**
@@ -65,27 +65,42 @@ public void testDoesNotExpireOthers() throws IOException,
6565
verifyQueueExpires(TEST_NORMAL_QUEUE, false);
6666
}
6767

68-
/**
69-
* Verify that the server throws an error if the type of x-expires is not
70-
* long.
71-
*/
72-
public void testExpireMustBeLong() throws IOException {
68+
public void testExpireMayBeByte() throws IOException {
7369
Map<String, Object> args = new HashMap<String, Object>();
74-
args.put("x-expires", 100);
70+
args.put("x-expires", (byte)100);
7571

7672
try {
77-
channel
78-
.queueDeclare("expiresMustBeLong", false, false, false,
79-
args);
80-
fail("server accepted x-expires not of type long");
73+
channel.queueDeclare("expiresMayBeByte", false, true, false, args);
8174
} catch (IOException e) {
82-
checkShutdownSignal(AMQP.PRECONDITION_FAILED, e);
75+
fail("server did not accept x-expires of type byte");
76+
}
77+
}
78+
79+
public void testExpireMayBeShort() throws IOException {
80+
Map<String, Object> args = new HashMap<String, Object>();
81+
args.put("x-expires", (short)100);
82+
83+
try {
84+
channel.queueDeclare("expiresMayBeShort", false, true, false, args);
85+
} catch (IOException e) {
86+
fail("server did not accept x-expires of type short");
87+
}
88+
}
89+
90+
public void testExpireMayBeLong() throws IOException {
91+
Map<String, Object> args = new HashMap<String, Object>();
92+
args.put("x-expires", 100L);
93+
94+
try {
95+
channel.queueDeclare("expiresMayBeLong", false, true, false, args);
96+
} catch (IOException e) {
97+
fail("server did not accept x-expires of type long");
8398
}
8499
}
85100

86101
public void testExpireMustBeGtZero() throws IOException {
87102
Map<String, Object> args = new HashMap<String, Object>();
88-
args.put("x-expires", 0L);
103+
args.put("x-expires", 0);
89104

90105
try {
91106
channel.queueDeclare("expiresMustBeGtZero", false, false, false,
@@ -98,7 +113,7 @@ public void testExpireMustBeGtZero() throws IOException {
98113

99114
public void testExpireMustBePositive() throws IOException {
100115
Map<String, Object> args = new HashMap<String, Object>();
101-
args.put("x-expires", -10L);
116+
args.put("x-expires", -10);
102117

103118
try {
104119
channel.queueDeclare("expiresMustBePositive", false, false, false,
@@ -115,9 +130,9 @@ public void testExpireMustBePositive() throws IOException {
115130
*/
116131
public void testQueueRedeclareEquivalence() throws IOException {
117132
Map<String, Object> args1 = new HashMap<String, Object>();
118-
args1.put("x-expires", 10000L);
133+
args1.put("x-expires", 10000);
119134
Map<String, Object> args2 = new HashMap<String, Object>();
120-
args2.put("x-expires", 20000L);
135+
args2.put("x-expires", 20000);
121136

122137
channel.queueDeclare(TEST_EXPIRE_REDECLARE_QUEUE, false, false, false,
123138
args1);
@@ -145,6 +160,7 @@ void verifyQueueExpires(String name, boolean expire) throws IOException,
145160
try {
146161
channel.queueDeclarePassive(name);
147162
} catch (IOException e) {
163+
checkShutdownSignal(AMQP.NOT_FOUND, e);
148164
fail("Queue expired before deadline.");
149165
}
150166

test/src/com/rabbitmq/client/test/functional/QueueLifecycle.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@ public void testQueueAutoDelete() throws IOException {
139139
try {
140140
verifyQueueExists(name);
141141
} catch (IOException ioe) {
142+
checkShutdownSignal(AMQP.NOT_FOUND, ioe);
142143
return;
143144
}
144145
fail("Queue should have been auto-deleted after we removed its only consumer");

test/src/com/rabbitmq/client/test/functional/TransactionsBase.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -248,8 +248,9 @@ public void testUnknownTagAck()
248248
txCommit();
249249
fail("expected exception");
250250
}
251-
catch (IOException e) {}
252-
catch (AlreadyClosedException e) {}
251+
catch (IOException e) {
252+
checkShutdownSignal(AMQP.PRECONDITION_FAILED, e);
253+
}
253254
connection = null;
254255
openConnection();
255256
}

test/src/com/rabbitmq/client/test/functional/UnexpectedFrames.java

Lines changed: 56 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,53 @@
1313
* Test that the server correctly handles us when we send it bad frames
1414
*/
1515
public class UnexpectedFrames extends BrokerTestCase {
16+
1617
private interface Confuser {
1718
public Frame confuse(Frame frame) throws IOException;
1819
}
1920

20-
@Override protected void setUp() throws IOException {}
21+
private static class ConfusedFrameHandler extends SocketFrameHandler {
22+
23+
private boolean confusedOnce = false;
24+
25+
public ConfusedFrameHandler(Socket socket) throws IOException {
26+
super(socket);
27+
}
28+
29+
@Override
30+
public void writeFrame(Frame frame) throws IOException {
31+
if (confusedOnce) {
32+
super.writeFrame(frame);
33+
} else {
34+
Frame confusedFrame = confuser.confuse(frame);
35+
if (confusedFrame != frame) confusedOnce = true;
36+
if (confusedFrame != null) {
37+
super.writeFrame(confusedFrame);
38+
}
39+
}
40+
}
41+
42+
public Confuser confuser = new Confuser() {
43+
public Frame confuse(Frame frame) {
44+
// Do nothing to start with, we need to negotiate before the
45+
// server will send us unexpected_frame errors
46+
return frame;
47+
}
48+
};
49+
}
2150

22-
@Override protected void tearDown() throws IOException {}
51+
private static class ConfusedConnectionFactory extends ConnectionFactory {
52+
53+
@Override protected FrameHandler createFrameHandler(Socket sock)
54+
throws IOException {
55+
return new ConfusedFrameHandler(sock);
56+
}
57+
}
58+
59+
public UnexpectedFrames() {
60+
super();
61+
connectionFactory = new ConfusedConnectionFactory();
62+
}
2363

2464
public void testMissingHeader() throws IOException {
2565
expectUnexpectedFrameError(new Confuser() {
@@ -38,7 +78,11 @@ public Frame confuse(Frame frame) {
3878
if (frame.type == AMQP.FRAME_METHOD) {
3979
// We can't just skip the method as that will lead us to
4080
// send 0 bytes and hang waiting for a response.
41-
frame.type = AMQP.FRAME_HEADER;
81+
Frame confusedFrame = new Frame(AMQP.FRAME_HEADER,
82+
frame.channel,
83+
frame.payload);
84+
confusedFrame.accumulator = frame.accumulator;
85+
return confusedFrame;
4286
}
4387
return frame;
4488
}
@@ -73,53 +117,16 @@ public Frame confuse(Frame frame) throws IOException {
73117
});
74118
}
75119

76-
private void expectUnexpectedFrameError(Confuser confuser) throws IOException {
77-
ConnectionFactory factory = new ConnectionFactory();
78-
Socket socket = factory.getSocketFactory().createSocket("localhost",
79-
AMQP.PROTOCOL.PORT);
80-
ConfusedFrameHandler handler = new ConfusedFrameHandler(socket);
81-
AMQConnection connection = new AMQConnection(factory, handler);
82-
connection.start();
83-
Channel channel = connection.createChannel();
84-
85-
handler.confuser = confuser;
86-
87-
try {
88-
//NB: the frame confuser relies on the encoding of the
89-
//method field to be at least 8 bytes long
90-
channel.basicPublish("", "routing key", null, "Hello".getBytes());
91-
channel.basicQos(0);
92-
fail("We should have seen an UNEXPECTED_FRAME by now");
93-
}
94-
catch (IOException e) {
95-
checkShutdownSignal(AMQP.UNEXPECTED_FRAME, e);
96-
}
97-
}
120+
private void expectUnexpectedFrameError(Confuser confuser)
121+
throws IOException {
98122

99-
private static class ConfusedFrameHandler extends SocketFrameHandler {
100-
public ConfusedFrameHandler(Socket socket) throws IOException {
101-
super(socket);
102-
}
123+
((ConfusedFrameHandler)((AMQConnection)connection).getFrameHandler()).
124+
confuser = confuser;
103125

104-
@Override
105-
public void writeFrame(Frame frame) throws IOException {
106-
Frame confusedFrame = new Frame();
107-
confusedFrame.accumulator = frame.accumulator;
108-
confusedFrame.channel = frame.channel;
109-
confusedFrame.type = frame.type;
110-
111-
confusedFrame = confuser.confuse(confusedFrame);
112-
if (confusedFrame != null) {
113-
super.writeFrame(confusedFrame);
114-
}
115-
}
116-
117-
public Confuser confuser = new Confuser() {
118-
public Frame confuse(Frame frame) {
119-
// Do nothing to start with, we need to negotiate before the
120-
// server will send us unexpected_frame errors
121-
return frame;
122-
}
123-
};
126+
//NB: the frame confuser relies on the encoding of the
127+
//method field to be at least 8 bytes long
128+
channel.basicPublish("", "routing key", null, "Hello".getBytes());
129+
expectError(AMQP.UNEXPECTED_FRAME);
124130
}
131+
125132
}

test/src/com/rabbitmq/client/test/server/ExclusiveQueueDurability.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import java.io.IOException;
3535
import java.util.HashMap;
3636

37+
import com.rabbitmq.client.AMQP;
3738
import com.rabbitmq.client.Channel;
3839
import com.rabbitmq.client.Connection;
3940
import com.rabbitmq.client.ConnectionFactory;
@@ -55,7 +56,7 @@ void verifyQueueMissing(Channel channel, String queueName)
5556
try {
5657
channel.queueDeclare(queueName, false, false, false, null);
5758
} catch (IOException ioe) {
58-
// FIXME check that it's specifically resource locked
59+
checkShutdownSignal(AMQP.RESOURCE_LOCKED, ioe);
5960
fail("Declaring the queue resulted in a channel exception, probably meaning that it already exists");
6061
}
6162
}

0 commit comments

Comments
 (0)