Skip to content

Commit 283abe7

Browse files
author
Steve Powell
committed
Merge default into bug24211
2 parents 99b064e + 93b3482 commit 283abe7

File tree

3 files changed

+100
-43
lines changed

3 files changed

+100
-43
lines changed

src/com/rabbitmq/client/impl/AMQChannel.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,11 +118,11 @@ public static IOException wrap(ShutdownSignalException ex, String message) {
118118
/**
119119
* Placeholder until we address bug 15786 (implementing a proper exception hierarchy).
120120
*/
121-
public AMQCommand exnWrappingRpc(Method m)
121+
public AMQCommand exnWrappingRpc(com.rabbitmq.client.Method m)
122122
throws IOException
123123
{
124124
try {
125-
return rpc(m);
125+
return rpc((com.rabbitmq.client.impl.Method)m);
126126
} catch (AlreadyClosedException ace) {
127127
// Do not wrap it since it means that connection/channel
128128
// was closed in some action in the past

src/com/rabbitmq/client/impl/AMQCommand.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ public AMQCommand() {
6969
* Construct a command with just a method, and without header or body.
7070
* @param method the wrapped method
7171
*/
72-
public AMQCommand(Method method) {
72+
public AMQCommand(com.rabbitmq.client.Method method) {
7373
this(method, null, null);
7474
}
7575

@@ -79,8 +79,8 @@ public AMQCommand(Method method) {
7979
* @param contentHeader the wrapped content header
8080
* @param body the message body data
8181
*/
82-
public AMQCommand(Method method, AMQContentHeader contentHeader, byte[] body) {
83-
_method = method;
82+
public AMQCommand(com.rabbitmq.client.Method method, AMQContentHeader contentHeader, byte[] body) {
83+
_method = (Method) method;
8484
_contentHeader = contentHeader;
8585
setContentBody(body);
8686
}

src/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 95 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -59,13 +59,6 @@
5959
public class ChannelN extends AMQChannel implements com.rabbitmq.client.Channel {
6060
private static final String UNSPECIFIED_OUT_OF_BAND = "";
6161

62-
/**
63-
* When 0.9.1 is signed off, tickets can be removed from the codec
64-
* and this field can be deleted.
65-
*/
66-
@Deprecated
67-
private static final int TICKET = 0;
68-
6962
/**
7063
* Map from consumer tag to {@link Consumer} instance.
7164
*
@@ -518,8 +511,12 @@ public void basicPublish(String exchange, String routingKey,
518511
if (props == null) {
519512
useProps = MessageProperties.MINIMAL_BASIC;
520513
}
521-
transmit(new AMQCommand(new Basic.Publish(TICKET, exchange, routingKey,
522-
mandatory, immediate),
514+
transmit(new AMQCommand(new Basic.Publish.Builder()
515+
.exchange(exchange)
516+
.routingKey(routingKey)
517+
.mandatory(mandatory)
518+
.immediate(immediate)
519+
.build(),
523520
useProps, body));
524521
}
525522

@@ -543,10 +540,15 @@ public Exchange.DeclareOk exchangeDeclare(String exchange, String type,
543540
throws IOException
544541
{
545542
return (Exchange.DeclareOk)
546-
exnWrappingRpc(new Exchange.Declare(TICKET, exchange, type,
547-
false, durable, autoDelete,
548-
internal, false,
549-
arguments)).getMethod();
543+
exnWrappingRpc(new Exchange.Declare.Builder()
544+
.exchange(exchange)
545+
.type(type)
546+
.durable(durable)
547+
.autoDelete(autoDelete)
548+
.internal(internal)
549+
.arguments(arguments)
550+
.build())
551+
.getMethod();
550552
}
551553

552554
/** Public API - {@inheritDoc} */
@@ -569,17 +571,24 @@ public Exchange.DeclareOk exchangeDeclarePassive(String exchange)
569571
throws IOException
570572
{
571573
return (Exchange.DeclareOk)
572-
exnWrappingRpc(new Exchange.Declare(TICKET, exchange, "",
573-
true, false, false,
574-
false, false, null)).getMethod();
574+
exnWrappingRpc(new Exchange.Declare.Builder()
575+
.exchange(exchange)
576+
.type("")
577+
.passive()
578+
.build())
579+
.getMethod();
575580
}
576581

577582
/** Public API - {@inheritDoc} */
578583
public Exchange.DeleteOk exchangeDelete(String exchange, boolean ifUnused)
579584
throws IOException
580585
{
581586
return (Exchange.DeleteOk)
582-
exnWrappingRpc(new Exchange.Delete(TICKET, exchange, ifUnused, false)).getMethod();
587+
exnWrappingRpc(new Exchange.Delete.Builder()
588+
.exchange(exchange)
589+
.ifUnused(ifUnused)
590+
.build())
591+
.getMethod();
583592
}
584593

585594
/** Public API - {@inheritDoc} */
@@ -593,9 +602,14 @@ public Exchange.DeleteOk exchangeDelete(String exchange)
593602
public Exchange.BindOk exchangeBind(String destination, String source,
594603
String routingKey, Map<String, Object> arguments)
595604
throws IOException {
596-
return (Exchange.BindOk) exnWrappingRpc(
597-
new Exchange.Bind(TICKET, destination, source, routingKey,
598-
false, arguments)).getMethod();
605+
return (Exchange.BindOk)
606+
exnWrappingRpc(new Exchange.Bind.Builder()
607+
.destination(destination)
608+
.source(source)
609+
.routingKey(routingKey)
610+
.arguments(arguments)
611+
.build())
612+
.getMethod();
599613
}
600614

601615
/** Public API - {@inheritDoc} */
@@ -608,9 +622,14 @@ public Exchange.BindOk exchangeBind(String destination, String source,
608622
public Exchange.UnbindOk exchangeUnbind(String destination, String source,
609623
String routingKey, Map<String, Object> arguments)
610624
throws IOException {
611-
return (Exchange.UnbindOk) exnWrappingRpc(
612-
new Exchange.Unbind(TICKET, destination, source, routingKey,
613-
false, arguments)).getMethod();
625+
return (Exchange.UnbindOk)
626+
exnWrappingRpc(new Exchange.Unbind.Builder()
627+
.destination(destination)
628+
.source(source)
629+
.routingKey(routingKey)
630+
.arguments(arguments)
631+
.build())
632+
.getMethod();
614633
}
615634

616635
/** Public API - {@inheritDoc} */
@@ -625,8 +644,14 @@ public Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclu
625644
throws IOException
626645
{
627646
return (Queue.DeclareOk)
628-
exnWrappingRpc(new Queue.Declare(TICKET, queue, false, durable,
629-
exclusive, autoDelete, false, arguments)).getMethod();
647+
exnWrappingRpc(new Queue.Declare.Builder()
648+
.queue(queue)
649+
.durable(durable)
650+
.exclusive(exclusive)
651+
.autoDelete(autoDelete)
652+
.arguments(arguments)
653+
.build())
654+
.getMethod();
630655
}
631656

632657
/** Public API - {@inheritDoc} */
@@ -641,16 +666,26 @@ public Queue.DeclareOk queueDeclarePassive(String queue)
641666
throws IOException
642667
{
643668
return (Queue.DeclareOk)
644-
exnWrappingRpc(new Queue.Declare(TICKET, queue, true, false,
645-
true, true, false, null)).getMethod();
669+
exnWrappingRpc(new Queue.Declare.Builder()
670+
.queue(queue)
671+
.passive()
672+
.exclusive()
673+
.autoDelete()
674+
.build())
675+
.getMethod();
646676
}
647677

648678
/** Public API - {@inheritDoc} */
649679
public Queue.DeleteOk queueDelete(String queue, boolean ifUnused, boolean ifEmpty)
650680
throws IOException
651681
{
652682
return (Queue.DeleteOk)
653-
exnWrappingRpc(new Queue.Delete(TICKET, queue, ifUnused, ifEmpty, false)).getMethod();
683+
exnWrappingRpc(new Queue.Delete.Builder()
684+
.queue(queue)
685+
.ifUnused(ifUnused)
686+
.ifEmpty(ifEmpty)
687+
.build())
688+
.getMethod();
654689
}
655690

656691
/** Public API - {@inheritDoc} */
@@ -666,8 +701,13 @@ public Queue.BindOk queueBind(String queue, String exchange,
666701
throws IOException
667702
{
668703
return (Queue.BindOk)
669-
exnWrappingRpc(new Queue.Bind(TICKET, queue, exchange, routingKey,
670-
false, arguments)).getMethod();
704+
exnWrappingRpc(new Queue.Bind.Builder()
705+
.queue(queue)
706+
.exchange(exchange)
707+
.routingKey(routingKey)
708+
.arguments(arguments)
709+
.build())
710+
.getMethod();
671711
}
672712

673713
/** Public API - {@inheritDoc} */
@@ -684,16 +724,24 @@ public Queue.UnbindOk queueUnbind(String queue, String exchange, String routingK
684724
throws IOException
685725
{
686726
return (Queue.UnbindOk)
687-
exnWrappingRpc(new Queue.Unbind(TICKET, queue, exchange, routingKey,
688-
arguments)).getMethod();
727+
exnWrappingRpc(new Queue.Unbind.Builder()
728+
.queue(queue)
729+
.exchange(exchange)
730+
.routingKey(routingKey)
731+
.arguments(arguments)
732+
.build())
733+
.getMethod();
689734
}
690735

691736
/** Public API - {@inheritDoc} */
692737
public Queue.PurgeOk queuePurge(String queue)
693738
throws IOException
694739
{
695740
return (Queue.PurgeOk)
696-
exnWrappingRpc(new Queue.Purge(TICKET, queue, false)).getMethod();
741+
exnWrappingRpc(new Queue.Purge.Builder()
742+
.queue(queue)
743+
.build())
744+
.getMethod();
697745
}
698746

699747
/** Public API - {@inheritDoc} */
@@ -707,7 +755,10 @@ public Queue.UnbindOk queueUnbind(String queue, String exchange, String routingK
707755
public GetResponse basicGet(String queue, boolean autoAck)
708756
throws IOException
709757
{
710-
AMQCommand replyCommand = exnWrappingRpc(new Basic.Get(TICKET, queue, autoAck));
758+
AMQCommand replyCommand = exnWrappingRpc(new Basic.Get.Builder()
759+
.queue(queue)
760+
.noAck(autoAck)
761+
.build());
711762
Method method = replyCommand.getMethod();
712763

713764
if (method instanceof Basic.GetOk) {
@@ -795,9 +846,15 @@ public String transformReply(AMQCommand replyCommand) {
795846
}
796847
};
797848

798-
rpc(new Basic.Consume(TICKET, queue, consumerTag,
799-
noLocal, autoAck, exclusive,
800-
false, arguments),
849+
rpc((Method)
850+
new Basic.Consume.Builder()
851+
.queue(queue)
852+
.consumerTag(consumerTag)
853+
.noLocal(noLocal)
854+
.noAck(autoAck)
855+
.exclusive(exclusive)
856+
.arguments(arguments)
857+
.build(),
801858
k);
802859

803860
try {
@@ -915,7 +972,7 @@ public void asyncRpc(com.rabbitmq.client.Method method) throws IOException {
915972
transmit((com.rabbitmq.client.impl.Method)method);
916973
}
917974

918-
public com.rabbitmq.client.Method rpc(com.rabbitmq.client.Method method) throws IOException {
975+
public Method rpc(com.rabbitmq.client.Method method) throws IOException {
919976
return exnWrappingRpc((com.rabbitmq.client.impl.Method)method).getMethod();
920977
}
921978

0 commit comments

Comments
 (0)