Skip to content

Commit 82d5f6d

Browse files
author
Alexandru Scvortov
committed
merge default into bug20337
2 parents e7d837e + 9a2fabe commit 82d5f6d

File tree

4 files changed

+174
-9
lines changed

4 files changed

+174
-9
lines changed

src/com/rabbitmq/client/impl/Frame.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -297,6 +297,9 @@ else if(value instanceof byte[]) {
297297
else if(value instanceof List) {
298298
acc += 4 + arraySize((List<?>)value);
299299
}
300+
else if(value instanceof Object[]) {
301+
acc += 4 + arraySize((Object[])value);
302+
}
300303
else if(value == null) {
301304
}
302305
else {
@@ -305,7 +308,7 @@ else if(value == null) {
305308
return acc;
306309
}
307310

308-
/** Computes the AMQP wire-protocol length of an encoded field-array */
311+
/** Computes the AMQP wire-protocol length of an encoded field-array of type List<?> */
309312
public static long arraySize(List<?> values)
310313
throws UnsupportedEncodingException
311314
{
@@ -315,7 +318,16 @@ public static long arraySize(List<?> values)
315318
}
316319
return acc;
317320
}
318-
321+
322+
/** Computes the AMQP wire-protocol length of an encoded field-array of type Object[] */
323+
public static long arraySize(Object[] values) throws UnsupportedEncodingException {
324+
long acc = 0;
325+
for (Object value : values) {
326+
acc += fieldValueSize(value);
327+
}
328+
return acc;
329+
}
330+
319331
/** Computes the AMQP wire-protocol length of a protocol-encoded long string. */
320332
private static int longStrSize(String str)
321333
throws UnsupportedEncodingException

src/com/rabbitmq/client/impl/ValueWriter.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,10 @@ else if(value instanceof List) {
200200
writeOctet('A');
201201
writeArray((List<?>)value);
202202
}
203+
else if(value instanceof Object[]) {
204+
writeOctet('A');
205+
writeArray((Object[])value);
206+
}
203207
else {
204208
throw new IllegalArgumentException
205209
("Invalid value type: " + value.getClass().getName());
@@ -220,6 +224,20 @@ public final void writeArray(List<?> value)
220224
}
221225
}
222226

227+
public final void writeArray(Object[] value)
228+
throws IOException
229+
{
230+
if (value==null) {
231+
out.write(0);
232+
}
233+
else {
234+
out.writeInt((int)Frame.arraySize(value));
235+
for (Object item : value) {
236+
writeFieldValue(item);
237+
}
238+
}
239+
}
240+
223241
/** Public API - encodes an octet from an int. */
224242
public final void writeOctet(int octet)
225243
throws IOException

test/src/com/rabbitmq/client/test/functional/QueueLifecycle.java

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,17 @@
1818
package com.rabbitmq.client.test.functional;
1919

2020
import java.io.IOException;
21+
import java.util.Arrays;
2122
import java.util.HashMap;
2223
import java.util.Map;
2324

2425
import com.rabbitmq.client.AMQP;
2526
import com.rabbitmq.client.QueueingConsumer;
2627
import com.rabbitmq.client.test.BrokerTestCase;
2728

28-
// Test queue auto-delete and exclusive semantics.
29+
/**
30+
* Test queue auto-delete and exclusive semantics.
31+
*/
2932
public class QueueLifecycle extends BrokerTestCase {
3033

3134
void verifyQueueExists(String name) throws IOException {
@@ -77,12 +80,13 @@ void verifyNotEquivalent(boolean durable, boolean exclusive,
7780
fail("Queue.declare should have been rejected as not equivalent");
7881
}
7982

80-
// From amqp-0-9-1.xml, for "passive" property, "equivalent" rule:
81-
// "If not set and the queue exists, the server MUST check that the
82-
// existing queue has the same values for durable, exclusive,
83-
// auto-delete, and arguments fields. The server MUST respond with
84-
// Declare-Ok if the requested queue matches these fields, and MUST
85-
// raise a channel exception if not."
83+
/** From amqp-0-9-1.xml, for "passive" property, "equivalent" rule:
84+
* "If not set and the queue exists, the server MUST check that the
85+
* existing queue has the same values for durable, exclusive,
86+
* auto-delete, and arguments fields. The server MUST respond with
87+
* Declare-Ok if the requested queue matches these fields, and MUST
88+
* raise a channel exception if not."
89+
*/
8690
public void testQueueEquivalence() throws IOException {
8791
String q = "queue";
8892
channel.queueDeclare(q, false, false, false, null);
@@ -154,4 +158,12 @@ public void testExclusiveGoesWithConnection() throws IOException {
154158
verifyQueueMissing(name);
155159
}
156160

161+
public void testArgumentArrays() throws IOException {
162+
Map<String, Object> args = new HashMap<String, Object>();
163+
String[] arr = new String[]{"foo", "bar", "baz"};
164+
args.put("my-key", arr);
165+
String queueName = "argumentArraysQueue";
166+
channel.queueDeclare(queueName, true, true, false, args);
167+
verifyQueue(queueName, true, true, false, args);
168+
}
157169
}

test/src/com/rabbitmq/client/test/functional/Transactions.java

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -329,4 +329,127 @@ public void testShuffleAcksBeforeRollback()
329329
basicAck(tags[2], false);
330330
txCommit();
331331
}
332+
333+
private abstract class NackMethod {
334+
abstract public void nack(long tag, boolean requeue)
335+
throws IOException;
336+
337+
public void nack(boolean requeue)
338+
throws IOException
339+
{
340+
nack(latestTag, requeue);
341+
}
342+
343+
public void nack()
344+
throws IOException
345+
{
346+
nack(latestTag, true);
347+
}
348+
}
349+
350+
private NackMethod basicNack = new NackMethod() {
351+
public void nack(long tag, boolean requeue)
352+
throws IOException
353+
{
354+
channel.basicNack(tag, false, requeue);
355+
}
356+
};
357+
358+
private NackMethod basicReject = new NackMethod() {
359+
public void nack(long tag, boolean requeue)
360+
throws IOException
361+
{
362+
channel.basicReject(tag, requeue);
363+
}
364+
};
365+
366+
/*
367+
messages with nacks get requeued after the transaction commit.
368+
messages with nacks with requeue = false are not requeued.
369+
*/
370+
public void commitNacks(NackMethod method)
371+
throws IOException
372+
{
373+
basicPublish();
374+
basicPublish();
375+
txSelect();
376+
basicGet();
377+
method.nack();
378+
basicGet();
379+
method.nack(false);
380+
assertNull(basicGet());
381+
txCommit();
382+
assertNotNull(basicGet());
383+
assertNull(basicGet());
384+
}
385+
386+
public void rollbackNacks(NackMethod method)
387+
throws IOException
388+
{
389+
basicPublish();
390+
txSelect();
391+
basicGet();
392+
method.nack(true);
393+
txRollback();
394+
assertNull(basicGet());
395+
}
396+
397+
public void commitAcksAndNacks(NackMethod method)
398+
throws IOException
399+
{
400+
for (int i = 0; i < 3; i++) {
401+
basicPublish();
402+
}
403+
txSelect();
404+
long tags[] = new long[3];
405+
for (int i = 0; i < 3; i++) {
406+
tags[i] = basicGet().getEnvelope().getDeliveryTag();
407+
}
408+
basicAck(tags[1], false);
409+
basicAck(tags[0], false);
410+
method.nack(tags[2], false);
411+
txRollback();
412+
basicAck(tags[2], false);
413+
method.nack(tags[0], true);
414+
method.nack(tags[1], false);
415+
txCommit();
416+
assertNotNull(basicGet());
417+
assertNull(basicGet());
418+
}
419+
420+
public void testCommitNacks()
421+
throws IOException
422+
{
423+
commitNacks(basicNack);
424+
}
425+
426+
public void testRollbackNacks()
427+
throws IOException
428+
{
429+
rollbackNacks(basicNack);
430+
}
431+
432+
public void testCommitAcksAndNacks()
433+
throws IOException
434+
{
435+
commitAcksAndNacks(basicNack);
436+
}
437+
438+
public void testCommitRejects()
439+
throws IOException
440+
{
441+
commitNacks(basicReject);
442+
}
443+
444+
public void testRollbackRejects()
445+
throws IOException
446+
{
447+
rollbackNacks(basicReject);
448+
}
449+
450+
public void testCommitAcksAndRejects()
451+
throws IOException
452+
{
453+
commitAcksAndNacks(basicReject);
454+
}
332455
}

0 commit comments

Comments
 (0)