Skip to content

Commit dab5aad

Browse files
author
Simon MacMullen
committed
stable to default
2 parents 986a8bb + c7dfce2 commit dab5aad

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

55 files changed

+2950
-391
lines changed

src/com/rabbitmq/client/Address.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public Address(String host, int port) {
4343
*/
4444
public Address(String host) {
4545
_host = host;
46-
_port = -1;
46+
_port = ConnectionFactory.USE_DEFAULT_PORT;
4747
}
4848

4949
/**

src/com/rabbitmq/client/Channel.java

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import com.rabbitmq.client.AMQP.Tx;
2727
import com.rabbitmq.client.AMQP.Basic;
2828
import com.rabbitmq.client.AMQP.Confirm;
29-
import com.rabbitmq.client.AMQP.Channel.FlowOk;
3029

3130
/**
3231
* Public API: Interface to an AMQ channel. See the <a href="http://www.amqp.org/">spec</a> for details.
@@ -90,17 +89,11 @@ public interface Channel extends ShutdownNotifier {
9089
void close(int closeCode, String closeMessage) throws IOException;
9190

9291
/**
93-
* Set flow on the channel
94-
*
95-
* @param active if true, the server is asked to start sending. If false, the server is asked to stop sending.
96-
* @throws IOException
97-
*/
98-
FlowOk flow(boolean active) throws IOException;
99-
100-
/**
101-
* Return the current Channel.Flow settings.
92+
* Indicates whether the server has asked this client to stop
93+
* sending content-bearing commands (such as basic.publish) by
94+
* issueing a channel.flow{active=false}.
10295
*/
103-
FlowOk getFlow();
96+
boolean flowBlocked();
10497

10598
/**
10699
* Abort this channel with the {@link com.rabbitmq.client.AMQP#REPLY_SUCCESS} close code

src/com/rabbitmq/client/Connection.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,5 +233,4 @@ public interface Connection extends ShutdownNotifier { // rename to AMQPConnecti
233233
* Remove all {@link BlockedListener}s.
234234
*/
235235
void clearBlockedListeners();
236-
237236
}

src/com/rabbitmq/client/ConnectionFactory.java

Lines changed: 120 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,6 @@
2222
import java.util.Map;
2323
import java.util.concurrent.ExecutorService;
2424

25-
import java.net.InetSocketAddress;
26-
import java.net.Socket;
2725
import java.net.URI;
2826
import java.net.URISyntaxException;
2927
import java.net.URLDecoder;
@@ -34,8 +32,10 @@
3432
import javax.net.ssl.TrustManager;
3533

3634
import com.rabbitmq.client.impl.AMQConnection;
35+
import com.rabbitmq.client.impl.ConnectionParams;
3736
import com.rabbitmq.client.impl.FrameHandler;
38-
import com.rabbitmq.client.impl.SocketFrameHandler;
37+
import com.rabbitmq.client.impl.FrameHandlerFactory;
38+
import com.rabbitmq.client.impl.recovery.AutorecoveringConnection;
3939

4040
/**
4141
* Convenience "factory" class to facilitate opening a {@link Connection} to an AMQP broker.
@@ -88,6 +88,13 @@ public class ConnectionFactory implements Cloneable {
8888
private Map<String, Object> _clientProperties = AMQConnection.defaultClientProperties();
8989
private SocketFactory factory = SocketFactory.getDefault();
9090
private SaslConfig saslConfig = DefaultSaslConfig.PLAIN;
91+
private ExecutorService sharedExecutor;
92+
private SocketConfigurator socketConf = new DefaultSocketConfigurator();
93+
94+
private boolean automaticRecovery = false;
95+
private boolean topologyRecovery = true;
96+
97+
private int networkRecoveryInterval = 5000;
9198

9299
/** @return number of consumer threads in default {@link ExecutorService} */
93100
@Deprecated
@@ -111,15 +118,15 @@ public void setHost(String host) {
111118
this.host = host;
112119
}
113120

114-
private int portOrDefault(int port){
115-
if(port != USE_DEFAULT_PORT) return port;
116-
else if(isSSL()) return DEFAULT_AMQP_OVER_SSL_PORT;
121+
public static int portOrDefault(int port, boolean ssl) {
122+
if (port != USE_DEFAULT_PORT) return port;
123+
else if (ssl) return DEFAULT_AMQP_OVER_SSL_PORT;
117124
else return DEFAULT_AMQP_PORT;
118125
}
119126

120127
/** @return the default port to use for connections */
121128
public int getPort() {
122-
return portOrDefault(port);
129+
return portOrDefault(port, isSSL());
123130
}
124131

125132
/**
@@ -178,6 +185,7 @@ public void setVirtualHost(String virtualHost) {
178185
this.virtualHost = virtualHost;
179186
}
180187

188+
181189
/**
182190
* Convenience method for setting the fields in an AMQP URI: host,
183191
* port, username, password and virtual host. If any part of the
@@ -384,6 +392,40 @@ public void setSocketFactory(SocketFactory factory) {
384392
this.factory = factory;
385393
}
386394

395+
/**
396+
* Get the socket configurator.
397+
*
398+
* @see #setSocketConfigurator(SocketConfigurator)
399+
*/
400+
@SuppressWarnings("unused")
401+
public SocketConfigurator getSocketConfigurator() {
402+
return socketConf;
403+
}
404+
405+
/**
406+
* Set the socket configurator. This gets a chance to "configure" a socket
407+
* after it has been opened. The default socket configurator disables
408+
* Nagle's algorithm.
409+
*
410+
* @param socketConfigurator the configurator to use
411+
*/
412+
public void setSocketConfigurator(SocketConfigurator socketConfigurator) {
413+
this.socketConf = socketConfigurator;
414+
}
415+
416+
/**
417+
* Set the executor to use by default for newly created connections.
418+
* All connections that use this executor share it.
419+
*
420+
* It's developer's responsibility to shut down the executor
421+
* when it is no longer needed.
422+
*
423+
* @param executor
424+
*/
425+
public void setSharedExecutor(ExecutorService executor) {
426+
this.sharedExecutor = executor;
427+
}
428+
387429
public boolean isSSL(){
388430
return getSocketFactory() instanceof SSLSocketFactory;
389431
}
@@ -433,49 +475,41 @@ public void useSslProtocol(SSLContext context)
433475
setSocketFactory(context.getSocketFactory());
434476
}
435477

436-
protected FrameHandler createFrameHandler(Address addr)
437-
throws IOException {
438-
439-
String hostName = addr.getHost();
440-
int portNumber = portOrDefault(addr.getPort());
441-
Socket socket = null;
442-
try {
443-
socket = factory.createSocket();
444-
configureSocket(socket);
445-
socket.connect(new InetSocketAddress(hostName, portNumber),
446-
connectionTimeout);
447-
return createFrameHandler(socket);
448-
} catch (IOException ioe) {
449-
quietTrySocketClose(socket);
450-
throw ioe;
451-
}
478+
/**
479+
* Returns true if automatic connection recovery is enabled, false otherwise
480+
* @return true if automatic connection recovery is enabled, false otherwise
481+
*/
482+
public boolean isAutomaticRecoveryEnabled() {
483+
return automaticRecovery;
452484
}
453485

454-
private static void quietTrySocketClose(Socket socket) {
455-
if (socket != null)
456-
try { socket.close(); } catch (Exception _) {/*ignore exceptions*/}
486+
/**
487+
* Enables or disables automatic connection recovery
488+
* @param automaticRecovery if true, enables connection recovery
489+
*/
490+
public void setAutomaticRecoveryEnabled(boolean automaticRecovery) {
491+
this.automaticRecovery = automaticRecovery;
457492
}
458493

459-
protected FrameHandler createFrameHandler(Socket sock)
460-
throws IOException
461-
{
462-
return new SocketFrameHandler(sock);
494+
/**
495+
* Returns true if topology recovery is enabled, false otherwise
496+
* @return true if topology recovery is enabled, false otherwise
497+
*/
498+
@SuppressWarnings("unused")
499+
public boolean isTopologyRecoveryEnabled() {
500+
return topologyRecovery;
463501
}
464502

465503
/**
466-
* Provides a hook to insert custom configuration of the sockets
467-
* used to connect to an AMQP server before they connect.
468-
*
469-
* The default behaviour of this method is to disable Nagle's
470-
* algorithm to get more consistently low latency. However it
471-
* may be overridden freely and there is no requirement to retain
472-
* this behaviour.
473-
*
474-
* @param socket The socket that is to be used for the Connection
504+
* Enables or disables topology recovery
505+
* @param topologyRecovery if true, enables topology recovery
475506
*/
476-
protected void configureSocket(Socket socket) throws IOException{
477-
// disable Nagle's algorithm, for more consistently low latency
478-
socket.setTcpNoDelay(true);
507+
public void setTopologyRecoveryEnabled(boolean topologyRecovery) {
508+
this.topologyRecovery = topologyRecovery;
509+
}
510+
511+
protected FrameHandlerFactory createFrameHandlerFactory() throws IOException {
512+
return new FrameHandlerFactory(connectionTimeout, factory, socketConf, isSSL());
479513
}
480514

481515
/**
@@ -485,43 +519,47 @@ protected void configureSocket(Socket socket) throws IOException{
485519
* @throws IOException if it encounters a problem
486520
*/
487521
public Connection newConnection(Address[] addrs) throws IOException {
488-
return newConnection(null, addrs);
522+
return newConnection(this.sharedExecutor, addrs);
489523
}
490524

491525
/**
492526
* Create a new broker connection
493527
* @param executor thread execution service for consumers on the connection
494528
* @param addrs an array of known broker addresses (hostname/port pairs) to try in order
495529
* @return an interface to the connection
496-
* @throws IOException if it encounters a problem
530+
* @throws java.io.IOException if it encounters a problem
497531
*/
498532
public Connection newConnection(ExecutorService executor, Address[] addrs)
499533
throws IOException
500534
{
501-
IOException lastException = null;
502-
for (Address addr : addrs) {
503-
try {
504-
FrameHandler frameHandler = createFrameHandler(addr);
505-
AMQConnection conn =
506-
new AMQConnection(username,
507-
password,
508-
frameHandler,
509-
executor,
510-
virtualHost,
511-
getClientProperties(),
512-
requestedFrameMax,
513-
requestedChannelMax,
514-
requestedHeartbeat,
515-
saslConfig);
516-
conn.start();
517-
return conn;
518-
} catch (IOException e) {
519-
lastException = e;
535+
FrameHandlerFactory fhFactory = createFrameHandlerFactory();
536+
ConnectionParams params = params(executor);
537+
538+
if (isAutomaticRecoveryEnabled()) {
539+
// see com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory#newConnection
540+
AutorecoveringConnection conn = new AutorecoveringConnection(params, fhFactory, addrs);
541+
conn.init();
542+
return conn;
543+
} else {
544+
IOException lastException = null;
545+
for (Address addr : addrs) {
546+
try {
547+
FrameHandler handler = fhFactory.create(addr);
548+
AMQConnection conn = new AMQConnection(params, handler);
549+
conn.start();
550+
return conn;
551+
} catch (IOException e) {
552+
lastException = e;
553+
}
520554
}
555+
throw (lastException != null) ? lastException : new IOException("failed to connect");
521556
}
557+
}
522558

523-
throw (lastException != null) ? lastException
524-
: new IOException("failed to connect");
559+
public ConnectionParams params(ExecutorService executor) {
560+
return new ConnectionParams(username, password, executor, virtualHost, getClientProperties(),
561+
requestedFrameMax, requestedChannelMax, requestedHeartbeat, saslConfig,
562+
networkRecoveryInterval, topologyRecovery);
525563
}
526564

527565
/**
@@ -530,7 +568,7 @@ public Connection newConnection(ExecutorService executor, Address[] addrs)
530568
* @throws IOException if it encounters a problem
531569
*/
532570
public Connection newConnection() throws IOException {
533-
return newConnection(null,
571+
return newConnection(this.sharedExecutor,
534572
new Address[] {new Address(getHost(), getPort())}
535573
);
536574
}
@@ -554,4 +592,20 @@ public Connection newConnection(ExecutorService executor) throws IOException {
554592
throw new Error(e);
555593
}
556594
}
595+
596+
/**
597+
* Returns automatic connection recovery interval in milliseconds.
598+
* @return how long will automatic recovery wait before attempting to reconnect, in ms; default is 5000
599+
*/
600+
public int getNetworkRecoveryInterval() {
601+
return networkRecoveryInterval;
602+
}
603+
604+
/**
605+
* Sets connection recovery interval. Default is 5000.
606+
* @param networkRecoveryInterval how long will automatic recovery wait before attempting to reconnect, in ms
607+
*/
608+
public void setNetworkRecoveryInterval(int networkRecoveryInterval) {
609+
this.networkRecoveryInterval = networkRecoveryInterval;
610+
}
557611
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package com.rabbitmq.client;
2+
3+
import java.io.IOException;
4+
import java.net.Socket;
5+
6+
public class DefaultSocketConfigurator implements SocketConfigurator {
7+
/**
8+
* Provides a hook to insert custom configuration of the sockets
9+
* used to connect to an AMQP server before they connect.
10+
*
11+
* The default behaviour of this method is to disable Nagle's
12+
* algorithm to get more consistently low latency. However it
13+
* may be overridden freely and there is no requirement to retain
14+
* this behaviour.
15+
*
16+
* @param socket The socket that is to be used for the Connection
17+
*/
18+
public void configure(Socket socket) throws IOException {
19+
// disable Nagle's algorithm, for more consistently low latency
20+
socket.setTcpNoDelay(true);
21+
}
22+
}

src/com/rabbitmq/client/Envelope.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,11 @@ public long getDeliveryTag() {
4848
}
4949

5050
/**
51-
* Get the redelivery flag included in this parameter envelope
51+
* Get the redelivery flag included in this parameter envelope. This is a
52+
* hint as to whether this message may have been delivered before (but not
53+
* acknowledged). If the flag is not set, the message definitely has not
54+
* been delivered before. If it is set, it may have been delivered before.
55+
*
5256
* @return the redelivery flag
5357
*/
5458
public boolean isRedeliver() {
@@ -70,4 +74,14 @@ public String getExchange() {
7074
public String getRoutingKey() {
7175
return _routingKey;
7276
}
77+
78+
@Override public String toString() {
79+
StringBuilder sb = new StringBuilder();
80+
sb.append("Envelope(deliveryTag=").append(_deliveryTag);
81+
sb.append(", redeliver=").append(_redeliver);
82+
sb.append(", exchange=").append(_exchange);
83+
sb.append(", routingKey=").append(_routingKey);
84+
sb.append(")");
85+
return sb.toString();
86+
}
7387
}

src/com/rabbitmq/client/GetResponse.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,4 +86,15 @@ public byte[] getBody() {
8686
public int getMessageCount() {
8787
return messageCount;
8888
}
89+
90+
@Override
91+
public String toString() {
92+
StringBuilder sb = new StringBuilder();
93+
sb.append("GetResponse(envelope=").append(envelope);
94+
sb.append(", props=").append(props);
95+
sb.append(", messageCount=").append(messageCount);
96+
sb.append(", body=(elided, ").append(body.length).append(" bytes long)");
97+
sb.append(")");
98+
return sb.toString();
99+
}
89100
}

0 commit comments

Comments
 (0)