Skip to content

Commit bda43b2

Browse files
author
Rob Harrop
committed
Merge with default
2 parents e7ed4f2 + 9753e14 commit bda43b2

21 files changed

+344
-199
lines changed

codegen.py

Lines changed: 176 additions & 115 deletions
Large diffs are not rendered by default.

src/com/rabbitmq/client/Channel.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -696,4 +696,19 @@ void basicNack(long deliveryTag, boolean multiple, boolean requeue)
696696
* @return the sequence number of the next message to be published
697697
*/
698698
long getNextPublishSeqNo();
699+
700+
/**
701+
* Asynchronously send a method over this channel.
702+
* @param method method to transmit over this channel.
703+
* @throws IOException Problem transmitting method.
704+
*/
705+
void asyncRpc(Method method) throws IOException;
706+
707+
/**
708+
* Synchronously send a method over this channel.
709+
* @param method method to transmit over this channel.
710+
* @return response to method. Caller should cast as appropriate.
711+
* @throws IOException Problem transmitting method.
712+
*/
713+
Method rpc(Method method) throws IOException;
699714
}

src/com/rabbitmq/client/impl/AMQBasicProperties.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
package com.rabbitmq.client.impl;
1818

19-
import java.io.IOException;
2019
import java.util.Date;
2120
import java.util.Map;
2221
import java.util.Hashtable;
@@ -32,8 +31,7 @@ public Object clone() throws CloneNotSupportedException {
3231

3332
Map<String, Object> thisHeaders = getHeaders();
3433
if (thisHeaders != null) {
35-
Map<String, Object> headers = new Hashtable<String, Object>();
36-
headers.putAll(thisHeaders);
34+
Map<String, Object> headers = new Hashtable<String, Object>(thisHeaders);
3735
bpClone.setHeaders(headers);
3836
}
3937

src/com/rabbitmq/client/impl/AMQCommand.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -218,8 +218,9 @@ public static void checkEmptyContentBodyFrameSize() {
218218
}
219219
int actualLength = s.toByteArray().length;
220220
if (EMPTY_CONTENT_BODY_FRAME_SIZE != actualLength) {
221-
throw new AssertionError("Internal error: EMPTY_CONTENT_BODY_FRAME_SIZE is " + "incorrect - defined as " + EMPTY_CONTENT_BODY_FRAME_SIZE
222-
+ ", where the computed value is in fact " + actualLength);
221+
throw new AssertionError("Internal error: expected EMPTY_CONTENT_BODY_FRAME_SIZE("
222+
+ EMPTY_CONTENT_BODY_FRAME_SIZE
223+
+ ") is not equal to computed value: " + actualLength);
223224
}
224225
}
225226

src/com/rabbitmq/client/impl/AMQConnection.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ public void ensureIsOpen()
146146
private int _heartbeat;
147147

148148
private final String _virtualHost;
149-
private final int _requestedChannelMax, _requestedFrameMax, _requestedHeartbeat;
149+
// private final int _requestedChannelMax, _requestedFrameMax, _requestedHeartbeat;
150150
private final Map<String, Object> _clientProperties;
151151

152152
/** Saved server properties field from connection.start */
@@ -194,9 +194,6 @@ public AMQConnection(ConnectionFactory factory,
194194
checkPreconditions();
195195

196196
_virtualHost = factory.getVirtualHost();
197-
_requestedChannelMax = factory.getRequestedChannelMax();
198-
_requestedFrameMax = factory.getRequestedFrameMax();
199-
_requestedHeartbeat = factory.getRequestedHeartbeat();
200197
_clientProperties = new HashMap<String, Object>(factory.getClientProperties());
201198

202199
_factory = factory;

src/com/rabbitmq/client/impl/ChannelManager.java

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -124,35 +124,35 @@ private void addChannel(ChannelN chan) {
124124
}
125125

126126
/**
127-
* Remove the argument channel from the channel map.
128-
* This method must be safe to call multiple times on the same channel. If
127+
* Remove the argument channel from the channel map.
128+
* This method must be safe to call multiple times on the same channel. If
129129
* it is not then things go badly wrong.
130130
*/
131131
public synchronized void disconnectChannel(ChannelN channel) {
132132
int channelNumber = channel.getChannelNumber();
133-
133+
134134
// Warning, here be dragons. Not great big ones, but little baby ones
135-
// which will nibble on your toes and occasionally trip you up when
136-
// you least expect it.
137-
// Basically, there's a race that can end us up here. It almost never
138-
// happens, but it's easier to repair it when it does than prevent it
139-
// from happening in the first place.
135+
// which will nibble on your toes and occasionally trip you up when
136+
// you least expect it.
137+
// Basically, there's a race that can end us up here. It almost never
138+
// happens, but it's easier to repair it when it does than prevent it
139+
// from happening in the first place.
140140
// If we end up doing a Channel.close in one thread and a Channel.open
141141
// with the same channel number in another, the two can overlap in such
142-
// a way as to cause disconnectChannel on the old channel to try to
142+
// a way as to cause disconnectChannel on the old channel to try to
143143
// remove the new one. Ideally we would fix this race at the source,
144-
// but it's much easier to just catch it here.
144+
// but it's much easier to just catch it here.
145145
synchronized (_channelMap) {
146146
ChannelN existing = _channelMap.remove(channelNumber);
147-
// Nothing to do here. Move along.
147+
// Nothing to do here. Move along.
148148
if (existing == null) return;
149149
// Oops, we've gone and stomped on someone else's channel. Put it back
150-
// and pretend we didn't touch it.
150+
// and pretend we didn't touch it.
151151
else if (existing != channel) {
152152
_channelMap.put(channelNumber, existing);
153153
return;
154154
}
155155
channelNumberAllocator.free(channelNumber);
156-
}
156+
}
157157
}
158158
}

src/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 31 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,7 @@ public void releaseChannelNumber() {
242242
if (method instanceof Basic.Deliver) {
243243
Basic.Deliver m = (Basic.Deliver) method;
244244

245-
Consumer callback = _consumers.get(m.consumerTag);
245+
Consumer callback = _consumers.get(m.getConsumerTag());
246246
if (callback == null) {
247247
if (defaultConsumer == null) {
248248
// No handler set. We should blow up as this message
@@ -257,20 +257,20 @@ public void releaseChannelNumber() {
257257
}
258258
}
259259

260-
Envelope envelope = new Envelope(m.deliveryTag,
261-
m.redelivered,
262-
m.exchange,
263-
m.routingKey);
260+
Envelope envelope = new Envelope(m.getDeliveryTag(),
261+
m.getRedelivered(),
262+
m.getExchange(),
263+
m.getRoutingKey());
264264
try {
265-
callback.handleDelivery(m.consumerTag,
265+
callback.handleDelivery(m.getConsumerTag(),
266266
envelope,
267267
(BasicProperties) command.getContentHeader(),
268268
command.getContentBody());
269269
} catch (Throwable ex) {
270270
_connection.getExceptionHandler().handleConsumerException(this,
271271
ex,
272272
callback,
273-
m.consumerTag,
273+
m.getConsumerTag(),
274274
"handleDelivery");
275275
}
276276
return true;
@@ -279,10 +279,10 @@ public void releaseChannelNumber() {
279279
if (l != null) {
280280
Basic.Return basicReturn = (Basic.Return) method;
281281
try {
282-
l.handleReturn(basicReturn.replyCode,
283-
basicReturn.replyText,
284-
basicReturn.exchange,
285-
basicReturn.routingKey,
282+
l.handleReturn(basicReturn.getReplyCode(),
283+
basicReturn.getReplyText(),
284+
basicReturn.getExchange(),
285+
basicReturn.getRoutingKey(),
286286
(BasicProperties)
287287
command.getContentHeader(),
288288
command.getContentBody());
@@ -295,14 +295,14 @@ public void releaseChannelNumber() {
295295
} else if (method instanceof Channel.Flow) {
296296
Channel.Flow channelFlow = (Channel.Flow) method;
297297
synchronized (_channelMutex) {
298-
_blockContent = !channelFlow.active;
299-
transmit(new Channel.FlowOk(channelFlow.active));
298+
_blockContent = !channelFlow.getActive();
299+
transmit(new Channel.FlowOk(!_blockContent));
300300
_channelMutex.notifyAll();
301301
}
302302
FlowListener l = getFlowListener();
303303
if (l != null) {
304304
try {
305-
l.handleFlow(channelFlow.active);
305+
l.handleFlow(channelFlow.getActive());
306306
} catch (Throwable ex) {
307307
_connection.getExceptionHandler().handleFlowListenerException(this, ex);
308308
}
@@ -711,13 +711,13 @@ public GetResponse basicGet(String queue, boolean autoAck)
711711

712712
if (method instanceof Basic.GetOk) {
713713
Basic.GetOk getOk = (Basic.GetOk)method;
714-
Envelope envelope = new Envelope(getOk.deliveryTag,
715-
getOk.redelivered,
716-
getOk.exchange,
717-
getOk.routingKey);
714+
Envelope envelope = new Envelope(getOk.getDeliveryTag(),
715+
getOk.getRedelivered(),
716+
getOk.getExchange(),
717+
getOk.getRoutingKey());
718718
BasicProperties props = (BasicProperties)replyCommand.getContentHeader();
719719
byte[] body = replyCommand.getContentBody();
720-
int messageCount = getOk.messageCount;
720+
int messageCount = getOk.getMessageCount();
721721
return new GetResponse(envelope, props, body, messageCount);
722722
} else if (method instanceof Basic.GetEmpty) {
723723
return null;
@@ -777,7 +777,7 @@ public String basicConsume(String queue, boolean autoAck, String consumerTag,
777777
{
778778
BlockingRpcContinuation<String> k = new BlockingRpcContinuation<String>() {
779779
public String transformReply(AMQCommand replyCommand) {
780-
String actualConsumerTag = ((Basic.ConsumeOk) replyCommand.getMethod()).consumerTag;
780+
String actualConsumerTag = ((Basic.ConsumeOk) replyCommand.getMethod()).getConsumerTag();
781781
_consumers.put(actualConsumerTag, callback);
782782
// We need to call back inside the connection thread
783783
// in order avoid races with 'deliver' commands
@@ -896,7 +896,7 @@ public Confirm.SelectOk confirmSelect()
896896

897897
/** Public API - {@inheritDoc} */
898898
public Channel.FlowOk flow(final boolean a) throws IOException {
899-
return (Channel.FlowOk) exnWrappingRpc(new Channel.Flow() {{active = a;}}).getMethod();
899+
return (Channel.FlowOk) exnWrappingRpc(new Channel.Flow(a)).getMethod();
900900
}
901901

902902
/** Public API - {@inheritDoc} */
@@ -908,5 +908,14 @@ public Channel.FlowOk getFlow() {
908908
public long getNextPublishSeqNo() {
909909
return nextPublishSeqNo;
910910
}
911-
911+
912+
public void asyncRpc(com.rabbitmq.client.Method method) throws IOException {
913+
// This cast should eventually go
914+
transmit((com.rabbitmq.client.impl.Method)method);
915+
}
916+
917+
public com.rabbitmq.client.Method rpc(com.rabbitmq.client.Method method) throws IOException {
918+
return exnWrappingRpc((com.rabbitmq.client.impl.Method)method).getMethod();
919+
}
920+
912921
}

src/com/rabbitmq/client/impl/ContentHeaderPropertyWriter.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@
1919

2020
import java.io.DataOutputStream;
2121
import java.io.IOException;
22-
import java.math.BigDecimal;
23-
import java.math.BigInteger;
2422
import java.util.Date;
2523
import java.util.Map;
2624

src/com/rabbitmq/client/impl/LongStringHelper.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ public class LongStringHelper
3232
* Private API - Implementation of {@link LongString}. When
3333
* interpreting bytes as a string, uses UTF-8 encoding.
3434
*/
35-
public static class ByteArrayLongString
35+
private static class ByteArrayLongString
3636
implements LongString
3737
{
3838
byte [] bytes;

src/com/rabbitmq/client/impl/Method.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -52,13 +52,6 @@ public abstract class Method implements com.rabbitmq.client.Method {
5252
*/
5353
public abstract Object visit(MethodVisitor visitor) throws IOException;
5454

55-
/**
56-
* Private API - Autogenerated reader for this method.
57-
* @param reader interface to an object to read the method arguments
58-
* @throws IOException if an error is encountered
59-
*/
60-
public abstract void readArgumentsFrom(MethodArgumentReader reader) throws IOException;
61-
6255
/**
6356
* Private API - Autogenerated writer for this method.
6457
* @param writer interface to an object to write the method arguments

0 commit comments

Comments
 (0)