Skip to content

Commit 86b0a3c

Browse files
author
Steve Powell
committed
Merge in default
2 parents 82492e1 + 63421cc commit 86b0a3c

33 files changed

+422
-311
lines changed

codegen.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -264,12 +264,12 @@ def printWritePropertiesTo(c):
264264
print " if (this.%s != null) writer.write%s(this.%s);" % (jfName, jfClass, jfName)
265265
print " }"
266266

267-
def printAppendArgumentDebugStringTo(c):
267+
def printAppendPropertyDebugStringTo(c):
268268
appendList = [ "%s=\")\n .append(this.%s)\n .append(\""
269269
% (f.name, java_field_name(f.name))
270270
for f in c.fields ]
271271
print
272-
print " public void appendArgumentDebugStringTo(StringBuilder acc) {"
272+
print " public void appendPropertyDebugStringTo(StringBuilder acc) {"
273273
print " acc.append(\"(%s)\");" % (", ".join(appendList))
274274
print " }"
275275

@@ -386,7 +386,7 @@ def printSetter(fieldType, fieldName):
386386
printSetter(jType, jName)
387387

388388
printWritePropertiesTo(c)
389-
printAppendArgumentDebugStringTo(c)
389+
printAppendPropertyDebugStringTo(c)
390390
printPropertiesBuilderClass(c)
391391

392392
print " }"

src/com/rabbitmq/client/Channel.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -638,7 +638,7 @@ void basicNack(long deliveryTag, boolean multiple, boolean requeue)
638638
* Cancel a consumer. Calls the consumer's {@link Consumer#handleCancelOk}
639639
* method.
640640
* @param consumerTag a client- or server-generated consumer tag to establish context
641-
* @throws java.io.IOException if an error is encountered
641+
* @throws IOException if an error is encountered, or if the consumerTag is unknown
642642
* @see com.rabbitmq.client.AMQP.Basic.Cancel
643643
* @see com.rabbitmq.client.AMQP.Basic.CancelOk
644644
*/
@@ -649,12 +649,12 @@ void basicNack(long deliveryTag, boolean multiple, boolean requeue)
649649
* basic.recover is asynchronous; in 0-9-1 it is synchronous, and
650650
* the new, deprecated method basic.recover_async is asynchronous.
651651
* <p/>
652-
* Equivalent to calling <code>basicRecover(true)</code>, messages
653-
* will be requeued and possibly delivered to a different consumer.
652+
* Equivalent to calling <code>basicRecover(true)</code>, messages
653+
* will be requeued and possibly delivered to a different consumer.
654654
* @see #basicRecover(boolean)
655655
*/
656656
Basic.RecoverOk basicRecover() throws IOException;
657-
657+
658658
/**
659659
* Ask the broker to resend unacknowledged messages. In 0-8
660660
* basic.recover is asynchronous; in 0-9-1 it is synchronous, and

src/com/rabbitmq/client/ConnectionFactory.java

Lines changed: 48 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import java.security.NoSuchAlgorithmException;
2222
import java.util.Map;
2323
import java.util.concurrent.ExecutorService;
24-
import java.util.concurrent.Executors;
2524

2625
import java.net.InetSocketAddress;
2726
import java.net.Socket;
@@ -43,39 +42,40 @@
4342
*/
4443

4544
public class ConnectionFactory implements Cloneable {
46-
45+
4746
/** Default Executor threads */
48-
private static final int DEFAULT_NUM_CONSUMER_THREADS = 5;
47+
@Deprecated
48+
public static final int DEFAULT_NUM_CONSUMER_THREADS = 5;
4949
/** Default user name */
50-
private static final String DEFAULT_USER = "guest";
50+
public static final String DEFAULT_USER = "guest";
5151
/** Default password */
52-
private static final String DEFAULT_PASS = "guest";
52+
public static final String DEFAULT_PASS = "guest";
5353
/** Default virtual host */
54-
private static final String DEFAULT_VHOST = "/";
54+
public static final String DEFAULT_VHOST = "/";
5555
/** Default maximum channel number;
5656
* zero for unlimited */
57-
private static final int DEFAULT_CHANNEL_MAX = 0;
57+
public static final int DEFAULT_CHANNEL_MAX = 0;
5858
/** Default maximum frame size;
5959
* zero means no limit */
60-
private static final int DEFAULT_FRAME_MAX = 0;
60+
public static final int DEFAULT_FRAME_MAX = 0;
6161
/** Default heart-beat interval;
6262
* zero means no heart-beats */
63-
private static final int DEFAULT_HEARTBEAT = 0;
63+
public static final int DEFAULT_HEARTBEAT = 0;
6464
/** The default host */
65-
private static final String DEFAULT_HOST = "localhost";
65+
public static final String DEFAULT_HOST = "localhost";
6666
/** 'Use the default port' port */
67-
private static final int USE_DEFAULT_PORT = -1;
67+
public static final int USE_DEFAULT_PORT = -1;
6868
/** The default non-ssl port */
69-
private static final int DEFAULT_AMQP_PORT = AMQP.PROTOCOL.PORT;
69+
public static final int DEFAULT_AMQP_PORT = AMQP.PROTOCOL.PORT;
7070
/** The default ssl port */
71-
private static final int DEFAULT_AMQP_OVER_SSL_PORT = 5671;
71+
public static final int DEFAULT_AMQP_OVER_SSL_PORT = 5671;
7272
/** The default connection timeout;
7373
* zero means wait indefinitely */
74-
private static final int DEFAULT_CONNECTION_TIMEOUT = 0;
74+
public static final int DEFAULT_CONNECTION_TIMEOUT = 0;
75+
7576
/** The default SSL protocol */
7677
private static final String DEFAULT_SSL_PROTOCOL = "SSLv3";
7778

78-
private int numConsumerThreads = DEFAULT_NUM_CONSUMER_THREADS;
7979
private String username = DEFAULT_USER;
8080
private String password = DEFAULT_PASS;
8181
private String virtualHost = DEFAULT_VHOST;
@@ -90,13 +90,15 @@ public class ConnectionFactory implements Cloneable {
9090
private SaslConfig saslConfig = DefaultSaslConfig.PLAIN;
9191

9292
/** @return number of consumer threads in default {@link ExecutorService} */
93+
@Deprecated
9394
public int getNumConsumerThreads() {
94-
return numConsumerThreads;
95+
return DEFAULT_NUM_CONSUMER_THREADS;
9596
}
9697

9798
/** @param numConsumerThreads threads in created private executor service */
99+
@Deprecated
98100
public void setNumConsumerThreads(int numConsumerThreads) {
99-
this.numConsumerThreads = numConsumerThreads;
101+
throw new IllegalArgumentException("setNumConsumerThreads not supported -- create explicit ExecutorService instead.");
100102
}
101103

102104
/** @return the default host to use for connections */
@@ -436,10 +438,22 @@ protected FrameHandler createFrameHandler(Address addr)
436438

437439
String hostName = addr.getHost();
438440
int portNumber = portOrDefault(addr.getPort());
439-
Socket socket = factory.createSocket();
440-
configureSocket(socket);
441-
socket.connect(new InetSocketAddress(hostName, portNumber), connectionTimeout);
442-
return createFrameHandler(socket);
441+
Socket socket = null;
442+
try {
443+
socket = factory.createSocket();
444+
configureSocket(socket);
445+
socket.connect(new InetSocketAddress(hostName, portNumber),
446+
connectionTimeout);
447+
return createFrameHandler(socket);
448+
} catch (IOException ioe) {
449+
quietTrySocketClose(socket);
450+
throw ioe;
451+
}
452+
}
453+
454+
private static void quietTrySocketClose(Socket socket) {
455+
if (socket != null)
456+
try { socket.close(); } catch (Exception _) {/*ignore exceptions*/}
443457
}
444458

445459
protected FrameHandler createFrameHandler(Socket sock)
@@ -464,21 +478,31 @@ protected void configureSocket(Socket socket) throws IOException{
464478
socket.setTcpNoDelay(true);
465479
}
466480

481+
/**
482+
* Create a new broker connection
483+
* @param addrs an array of known broker addresses (hostname/port pairs) to try in order
484+
* @return an interface to the connection
485+
* @throws IOException if it encounters a problem
486+
*/
487+
public Connection newConnection(Address[] addrs) throws IOException {
488+
return newConnection(null, addrs);
489+
}
490+
467491
/**
468492
* Create a new broker connection
469493
* @param executor thread execution service for consumers on the connection
470494
* @param addrs an array of known broker addresses (hostname/port pairs) to try in order
471495
* @return an interface to the connection
472496
* @throws IOException if it encounters a problem
473497
*/
474-
private Connection newConnection(ExecutorService executor, Address[] addrs)
498+
public Connection newConnection(ExecutorService executor, Address[] addrs)
475499
throws IOException
476500
{
477501
IOException lastException = null;
478502
for (Address addr : addrs) {
479503
try {
480504
FrameHandler frameHandler = createFrameHandler(addr);
481-
AMQConnection conn =
505+
AMQConnection conn =
482506
new AMQConnection(username,
483507
password,
484508
frameHandler,
@@ -506,7 +530,7 @@ private Connection newConnection(ExecutorService executor, Address[] addrs)
506530
* @throws IOException if it encounters a problem
507531
*/
508532
public Connection newConnection() throws IOException {
509-
return newConnection(Executors.newFixedThreadPool(this.numConsumerThreads),
533+
return newConnection(null,
510534
new Address[] {new Address(getHost(), getPort())}
511535
);
512536
}

src/com/rabbitmq/client/impl/AMQChannel.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -99,9 +99,7 @@ public void handleFrame(Frame frame) throws IOException {
9999
* @return the wrapped exception
100100
*/
101101
public static IOException wrap(ShutdownSignalException ex) {
102-
IOException ioe = new IOException();
103-
ioe.initCause(ex);
104-
return ioe;
102+
return wrap(ex, null);
105103
}
106104

107105
public static IOException wrap(ShutdownSignalException ex, String message) {
@@ -153,13 +151,17 @@ public void handleCompleteInboundCommand(AMQCommand command) throws IOException
153151
public void enqueueRpc(RpcContinuation k)
154152
{
155153
synchronized (_channelMutex) {
154+
boolean waitClearedInterruptStatus = false;
156155
while (_activeRpc != null) {
157156
try {
158157
_channelMutex.wait();
159158
} catch (InterruptedException e) {
160-
Thread.currentThread().interrupt();
159+
waitClearedInterruptStatus = true;
161160
}
162161
}
162+
if (waitClearedInterruptStatus) {
163+
Thread.currentThread().interrupt();
164+
}
163165
_activeRpc = k;
164166
}
165167
}

src/com/rabbitmq/client/impl/AMQCommand.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ private static StringBuilder contentBodyStringBuilder(byte[] body, boolean suppr
146146
if (suppressBody) {
147147
return new StringBuilder().append(body.length).append(" bytes of payload");
148148
} else {
149-
return new StringBuilder().append('\"').append(body).append('\"');
149+
return new StringBuilder().append('\"').append(new String(body, "UTF-8")).append('\"');
150150
}
151151
} catch (Exception e) {
152152
return new StringBuilder().append('|').append(body.length).append('|');

0 commit comments

Comments
 (0)