Skip to content

Commit cc5629e

Browse files
author
Steve Powell
committed
Merge in default
2 parents 397672f + 63421cc commit cc5629e

File tree

7 files changed

+168
-107
lines changed

7 files changed

+168
-107
lines changed

codegen.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -264,12 +264,12 @@ def printWritePropertiesTo(c):
264264
print " if (this.%s != null) writer.write%s(this.%s);" % (jfName, jfClass, jfName)
265265
print " }"
266266

267-
def printAppendArgumentDebugStringTo(c):
267+
def printAppendPropertyDebugStringTo(c):
268268
appendList = [ "%s=\")\n .append(this.%s)\n .append(\""
269269
% (f.name, java_field_name(f.name))
270270
for f in c.fields ]
271271
print
272-
print " public void appendArgumentDebugStringTo(StringBuilder acc) {"
272+
print " public void appendPropertyDebugStringTo(StringBuilder acc) {"
273273
print " acc.append(\"(%s)\");" % (", ".join(appendList))
274274
print " }"
275275

@@ -386,7 +386,7 @@ def printSetter(fieldType, fieldName):
386386
printSetter(jType, jName)
387387

388388
printWritePropertiesTo(c)
389-
printAppendArgumentDebugStringTo(c)
389+
printAppendPropertyDebugStringTo(c)
390390
printPropertiesBuilderClass(c)
391391

392392
print " }"

src/com/rabbitmq/client/Channel.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -638,7 +638,7 @@ void basicNack(long deliveryTag, boolean multiple, boolean requeue)
638638
* Cancel a consumer. Calls the consumer's {@link Consumer#handleCancelOk}
639639
* method.
640640
* @param consumerTag a client- or server-generated consumer tag to establish context
641-
* @throws java.io.IOException if an error is encountered
641+
* @throws IOException if an error is encountered, or if the consumerTag is unknown
642642
* @see com.rabbitmq.client.AMQP.Basic.Cancel
643643
* @see com.rabbitmq.client.AMQP.Basic.CancelOk
644644
*/
@@ -649,12 +649,12 @@ void basicNack(long deliveryTag, boolean multiple, boolean requeue)
649649
* basic.recover is asynchronous; in 0-9-1 it is synchronous, and
650650
* the new, deprecated method basic.recover_async is asynchronous.
651651
* <p/>
652-
* Equivalent to calling <code>basicRecover(true)</code>, messages
653-
* will be requeued and possibly delivered to a different consumer.
652+
* Equivalent to calling <code>basicRecover(true)</code>, messages
653+
* will be requeued and possibly delivered to a different consumer.
654654
* @see #basicRecover(boolean)
655655
*/
656656
Basic.RecoverOk basicRecover() throws IOException;
657-
657+
658658
/**
659659
* Ask the broker to resend unacknowledged messages. In 0-8
660660
* basic.recover is asynchronous; in 0-9-1 it is synchronous, and

src/com/rabbitmq/client/impl/AMQCommand.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ private static StringBuilder contentBodyStringBuilder(byte[] body, boolean suppr
146146
if (suppressBody) {
147147
return new StringBuilder().append(body.length).append(" bytes of payload");
148148
} else {
149-
return new StringBuilder().append('\"').append(body).append('\"');
149+
return new StringBuilder().append('\"').append(new String(body, "UTF-8")).append('\"');
150150
}
151151
} catch (Exception e) {
152152
return new StringBuilder().append('|').append(body.length).append('|');

src/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,7 @@ private CountDownLatch broadcastShutdownSignal(ShutdownSignalException signal) {
233233
}
234234
}
235235

236-
public CountDownLatch getShutdownLatch() {
236+
CountDownLatch getShutdownLatch() {
237237
return this.finishedShutdownFlag;
238238
}
239239

@@ -387,7 +387,7 @@ private void callReturnListeners(Command command, Basic.Return basicReturn) {
387387
}
388388
}
389389

390-
private void callFlowListeners(Command command, Channel.Flow channelFlow) {
390+
private void callFlowListeners(@SuppressWarnings("unused") Command command, Channel.Flow channelFlow) {
391391
try {
392392
for (FlowListener l : this.flowListeners) {
393393
l.handleFlow(channelFlow.getActive());
@@ -397,7 +397,7 @@ private void callFlowListeners(Command command, Channel.Flow channelFlow) {
397397
}
398398
}
399399

400-
private void callConfirmListeners(Command command, Basic.Ack ack) {
400+
private void callConfirmListeners(@SuppressWarnings("unused") Command command, Basic.Ack ack) {
401401
try {
402402
for (ConfirmListener l : this.confirmListeners) {
403403
l.handleAck(ack.getDeliveryTag(), ack.getMultiple());
@@ -407,7 +407,7 @@ private void callConfirmListeners(Command command, Basic.Ack ack) {
407407
}
408408
}
409409

410-
private void callConfirmListeners(Command command, Basic.Nack nack) {
410+
private void callConfirmListeners(@SuppressWarnings("unused") Command command, Basic.Nack nack) {
411411
try {
412412
for (ConfirmListener l : this.confirmListeners) {
413413
l.handleNack(nack.getDeliveryTag(), nack.getMultiple());
@@ -462,10 +462,17 @@ public void abort(int closeCode, String closeMessage)
462462
close(closeCode, closeMessage, true, null, true);
463463
}
464464

465+
// TODO: method should be private
465466
/**
466467
* Protected API - Close channel with code and message, indicating
467468
* the source of the closure and a causing exception (null if
468469
* none).
470+
* @param closeCode the close code (See under "Reply Codes" in the AMQP specification)
471+
* @param closeMessage a message indicating the reason for closing the connection
472+
* @param initiatedByApplication true if this comes from an API call, false otherwise
473+
* @param cause exception triggering close
474+
* @param abort true if we should close and ignore errors
475+
* @throws IOException if an error is encountered
469476
*/
470477
public void close(int closeCode,
471478
String closeMessage,
@@ -907,15 +914,16 @@ public String transformReply(AMQCommand replyCommand) {
907914
public void basicCancel(final String consumerTag)
908915
throws IOException
909916
{
917+
final Consumer originalConsumer = _consumers.get(consumerTag);
918+
if (originalConsumer == null)
919+
throw new IOException("Unknown consumerTag");
910920
BlockingRpcContinuation<Consumer> k = new BlockingRpcContinuation<Consumer>() {
911921
public Consumer transformReply(AMQCommand replyCommand) {
912922
Basic.CancelOk dummy = (Basic.CancelOk) replyCommand.getMethod();
913923
Utility.use(dummy);
914-
Consumer callback = _consumers.remove(consumerTag);
915-
// We need to call back inside the connection thread
916-
// in order avoid races with 'deliver' commands
917-
dispatcher.handleCancelOk(callback, consumerTag);
918-
return callback;
924+
_consumers.remove(consumerTag); //may already have been removed
925+
dispatcher.handleCancelOk(originalConsumer, consumerTag);
926+
return originalConsumer;
919927
}
920928
};
921929

test/src/com/rabbitmq/examples/ConsumerMain.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -188,13 +188,13 @@ public void report(boolean writeStats) throws IOException {
188188
sumL += v;
189189
}
190190

191-
double avgL = sumL / messageCount;
192-
System.out.println("CONSUMER - Message count: " + messageCount);
193-
System.out.println("Total time, milliseconds: " + totalDelta);
194-
System.out.println("Overall messages-per-second: " + (messageCount / (totalDelta / 1000.0)));
195-
System.out.println("Min latency, milliseconds: " + minL);
196-
System.out.println("Avg latency, milliseconds: " + avgL);
197-
System.out.println("Max latency, milliseconds: " + maxL);
191+
System.out.println("CONSUMER - Overall: "
192+
+ String.format("%d messages in %dms, a rate of %.2f msgs/sec", messageCount,
193+
totalDelta,
194+
(messageCount / (totalDelta / 1000.0))));
195+
System.out.println("Latency - Min (Avg) Max: "
196+
+ String.format("%dms (%.2fms) %dms", minL, sumL
197+
/ messageCount, maxL));
198198

199199
if (writeStats) {
200200
PrintStream o = new PrintStream(new FileOutputStream("simple-latency-experiment.csv"));

0 commit comments

Comments
 (0)