Skip to content

Commit 309c392

Browse files
author
Simon MacMullen
committed
Merge bug14587
2 parents fe069ad + b59a4f6 commit 309c392

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+2758
-326
lines changed

src/com/rabbitmq/client/Address.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public Address(String host, int port) {
4343
*/
4444
public Address(String host) {
4545
_host = host;
46-
_port = -1;
46+
_port = ConnectionFactory.USE_DEFAULT_PORT;
4747
}
4848

4949
/**

src/com/rabbitmq/client/Connection.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,5 +233,4 @@ public interface Connection extends ShutdownNotifier { // rename to AMQPConnecti
233233
* Remove all {@link BlockedListener}s.
234234
*/
235235
void clearBlockedListeners();
236-
237236
}

src/com/rabbitmq/client/ConnectionFactory.java

Lines changed: 104 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,6 @@
2222
import java.util.Map;
2323
import java.util.concurrent.ExecutorService;
2424

25-
import java.net.InetSocketAddress;
26-
import java.net.Socket;
2725
import java.net.URI;
2826
import java.net.URISyntaxException;
2927
import java.net.URLDecoder;
@@ -34,8 +32,10 @@
3432
import javax.net.ssl.TrustManager;
3533

3634
import com.rabbitmq.client.impl.AMQConnection;
35+
import com.rabbitmq.client.impl.ConnectionParams;
3736
import com.rabbitmq.client.impl.FrameHandler;
38-
import com.rabbitmq.client.impl.SocketFrameHandler;
37+
import com.rabbitmq.client.impl.FrameHandlerFactory;
38+
import com.rabbitmq.client.impl.recovery.AutorecoveringConnection;
3939

4040
/**
4141
* Convenience "factory" class to facilitate opening a {@link Connection} to an AMQP broker.
@@ -89,6 +89,12 @@ public class ConnectionFactory implements Cloneable {
8989
private SocketFactory factory = SocketFactory.getDefault();
9090
private SaslConfig saslConfig = DefaultSaslConfig.PLAIN;
9191
private ExecutorService sharedExecutor;
92+
private SocketConfigurator socketConf = new DefaultSocketConfigurator();
93+
94+
private boolean automaticRecovery = false;
95+
private boolean topologyRecovery = true;
96+
97+
private int networkRecoveryInterval = 5000;
9298

9399
/** @return number of consumer threads in default {@link ExecutorService} */
94100
@Deprecated
@@ -112,15 +118,15 @@ public void setHost(String host) {
112118
this.host = host;
113119
}
114120

115-
private int portOrDefault(int port){
116-
if(port != USE_DEFAULT_PORT) return port;
117-
else if(isSSL()) return DEFAULT_AMQP_OVER_SSL_PORT;
121+
public static int portOrDefault(int port, boolean ssl) {
122+
if (port != USE_DEFAULT_PORT) return port;
123+
else if (ssl) return DEFAULT_AMQP_OVER_SSL_PORT;
118124
else return DEFAULT_AMQP_PORT;
119125
}
120126

121127
/** @return the default port to use for connections */
122128
public int getPort() {
123-
return portOrDefault(port);
129+
return portOrDefault(port, isSSL());
124130
}
125131

126132
/**
@@ -179,6 +185,7 @@ public void setVirtualHost(String virtualHost) {
179185
this.virtualHost = virtualHost;
180186
}
181187

188+
182189
/**
183190
* Convenience method for setting the fields in an AMQP URI: host,
184191
* port, username, password and virtual host. If any part of the
@@ -385,6 +392,27 @@ public void setSocketFactory(SocketFactory factory) {
385392
this.factory = factory;
386393
}
387394

395+
/**
396+
* Get the socket configurator.
397+
*
398+
* @see #setSocketConfigurator(SocketConfigurator)
399+
*/
400+
@SuppressWarnings("unused")
401+
public SocketConfigurator getSocketConfigurator() {
402+
return socketConf;
403+
}
404+
405+
/**
406+
* Set the socket configurator. This gets a chance to "configure" a socket
407+
* after it has been opened. The default socket configurator disables
408+
* Nagle's algorithm.
409+
*
410+
* @param socketConfigurator the configurator to use
411+
*/
412+
public void setSocketConfigurator(SocketConfigurator socketConfigurator) {
413+
this.socketConf = socketConfigurator;
414+
}
415+
388416
/**
389417
* Set the executor to use by default for newly created connections.
390418
* All connections that use this executor share it.
@@ -447,49 +475,41 @@ public void useSslProtocol(SSLContext context)
447475
setSocketFactory(context.getSocketFactory());
448476
}
449477

450-
protected FrameHandler createFrameHandler(Address addr)
451-
throws IOException {
452-
453-
String hostName = addr.getHost();
454-
int portNumber = portOrDefault(addr.getPort());
455-
Socket socket = null;
456-
try {
457-
socket = factory.createSocket();
458-
configureSocket(socket);
459-
socket.connect(new InetSocketAddress(hostName, portNumber),
460-
connectionTimeout);
461-
return createFrameHandler(socket);
462-
} catch (IOException ioe) {
463-
quietTrySocketClose(socket);
464-
throw ioe;
465-
}
478+
/**
479+
* Returns true if automatic connection recovery is enabled, false otherwise
480+
* @return true if automatic connection recovery is enabled, false otherwise
481+
*/
482+
public boolean isAutomaticRecoveryEnabled() {
483+
return automaticRecovery;
466484
}
467485

468-
private static void quietTrySocketClose(Socket socket) {
469-
if (socket != null)
470-
try { socket.close(); } catch (Exception _) {/*ignore exceptions*/}
486+
/**
487+
* Enables or disables automatic connection recovery
488+
* @param automaticRecovery if true, enables connection recovery
489+
*/
490+
public void setAutomaticRecoveryEnabled(boolean automaticRecovery) {
491+
this.automaticRecovery = automaticRecovery;
471492
}
472493

473-
protected FrameHandler createFrameHandler(Socket sock)
474-
throws IOException
475-
{
476-
return new SocketFrameHandler(sock);
494+
/**
495+
* Returns true if topology recovery is enabled, false otherwise
496+
* @return true if topology recovery is enabled, false otherwise
497+
*/
498+
@SuppressWarnings("unused")
499+
public boolean isTopologyRecoveryEnabled() {
500+
return topologyRecovery;
477501
}
478502

479503
/**
480-
* Provides a hook to insert custom configuration of the sockets
481-
* used to connect to an AMQP server before they connect.
482-
*
483-
* The default behaviour of this method is to disable Nagle's
484-
* algorithm to get more consistently low latency. However it
485-
* may be overridden freely and there is no requirement to retain
486-
* this behaviour.
487-
*
488-
* @param socket The socket that is to be used for the Connection
504+
* Enables or disables topology recovery
505+
* @param topologyRecovery if true, enables topology recovery
489506
*/
490-
protected void configureSocket(Socket socket) throws IOException{
491-
// disable Nagle's algorithm, for more consistently low latency
492-
socket.setTcpNoDelay(true);
507+
public void setTopologyRecoveryEnabled(boolean topologyRecovery) {
508+
this.topologyRecovery = topologyRecovery;
509+
}
510+
511+
protected FrameHandlerFactory createFrameHandlerFactory() throws IOException {
512+
return new FrameHandlerFactory(connectionTimeout, factory, socketConf, isSSL());
493513
}
494514

495515
/**
@@ -507,35 +527,39 @@ public Connection newConnection(Address[] addrs) throws IOException {
507527
* @param executor thread execution service for consumers on the connection
508528
* @param addrs an array of known broker addresses (hostname/port pairs) to try in order
509529
* @return an interface to the connection
510-
* @throws IOException if it encounters a problem
530+
* @throws java.io.IOException if it encounters a problem
511531
*/
512532
public Connection newConnection(ExecutorService executor, Address[] addrs)
513533
throws IOException
514534
{
515-
IOException lastException = null;
516-
for (Address addr : addrs) {
517-
try {
518-
FrameHandler frameHandler = createFrameHandler(addr);
519-
AMQConnection conn =
520-
new AMQConnection(username,
521-
password,
522-
frameHandler,
523-
executor,
524-
virtualHost,
525-
getClientProperties(),
526-
requestedFrameMax,
527-
requestedChannelMax,
528-
requestedHeartbeat,
529-
saslConfig);
530-
conn.start();
531-
return conn;
532-
} catch (IOException e) {
533-
lastException = e;
535+
FrameHandlerFactory fhFactory = createFrameHandlerFactory();
536+
ConnectionParams params = params(executor);
537+
538+
if (isAutomaticRecoveryEnabled()) {
539+
// see com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory#newConnection
540+
AutorecoveringConnection conn = new AutorecoveringConnection(params, fhFactory, addrs);
541+
conn.init();
542+
return conn;
543+
} else {
544+
IOException lastException = null;
545+
for (Address addr : addrs) {
546+
try {
547+
FrameHandler handler = fhFactory.create(addr);
548+
AMQConnection conn = new AMQConnection(params, handler);
549+
conn.start();
550+
return conn;
551+
} catch (IOException e) {
552+
lastException = e;
553+
}
534554
}
555+
throw (lastException != null) ? lastException : new IOException("failed to connect");
535556
}
557+
}
536558

537-
throw (lastException != null) ? lastException
538-
: new IOException("failed to connect");
559+
public ConnectionParams params(ExecutorService executor) {
560+
return new ConnectionParams(username, password, executor, virtualHost, getClientProperties(),
561+
requestedFrameMax, requestedChannelMax, requestedHeartbeat, saslConfig,
562+
networkRecoveryInterval, topologyRecovery);
539563
}
540564

541565
/**
@@ -568,4 +592,20 @@ public Connection newConnection(ExecutorService executor) throws IOException {
568592
throw new Error(e);
569593
}
570594
}
595+
596+
/**
597+
* Returns automatic connection recovery interval in milliseconds.
598+
* @return how long will automatic recovery wait before attempting to reconnect, in ms; default is 5000
599+
*/
600+
public int getNetworkRecoveryInterval() {
601+
return networkRecoveryInterval;
602+
}
603+
604+
/**
605+
* Sets connection recovery interval. Default is 5000.
606+
* @param networkRecoveryInterval how long will automatic recovery wait before attempting to reconnect, in ms
607+
*/
608+
public void setNetworkRecoveryInterval(int networkRecoveryInterval) {
609+
this.networkRecoveryInterval = networkRecoveryInterval;
610+
}
571611
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package com.rabbitmq.client;
2+
3+
import java.io.IOException;
4+
import java.net.Socket;
5+
6+
public class DefaultSocketConfigurator implements SocketConfigurator {
7+
/**
8+
* Provides a hook to insert custom configuration of the sockets
9+
* used to connect to an AMQP server before they connect.
10+
*
11+
* The default behaviour of this method is to disable Nagle's
12+
* algorithm to get more consistently low latency. However it
13+
* may be overridden freely and there is no requirement to retain
14+
* this behaviour.
15+
*
16+
* @param socket The socket that is to be used for the Connection
17+
*/
18+
public void configure(Socket socket) throws IOException {
19+
// disable Nagle's algorithm, for more consistently low latency
20+
socket.setTcpNoDelay(true);
21+
}
22+
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package com.rabbitmq.client;
2+
3+
/**
4+
* Provides a way to register (network, AMQP 0-9-1) connection recovery
5+
* callbacks.
6+
*/
7+
public interface Recoverable {
8+
/**
9+
* Registers a connection recovery callback.
10+
*
11+
* @param f Callback function
12+
*/
13+
public void addRecoveryListener(RecoveryListener listener);
14+
15+
public void removeRecoveryListener(RecoveryListener listener);
16+
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package com.rabbitmq.client;
2+
3+
/**
4+
* A RecoveryListener receives notifications about completed automatic connection
5+
* recovery.
6+
*
7+
* @since 3.3.0
8+
*/
9+
public interface RecoveryListener {
10+
public void handleRecovery(Recoverable recoverable);
11+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package com.rabbitmq.client;
2+
3+
import java.io.IOException;
4+
import java.net.Socket;
5+
6+
public interface SocketConfigurator {
7+
/**
8+
* Provides a hook to insert custom configuration of the sockets
9+
* used to connect to an AMQP server before they connect.
10+
*/
11+
void configure(Socket socket) throws IOException;
12+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package com.rabbitmq.client;
2+
3+
/**
4+
* Indicates an exception thrown during topology recovery.
5+
*
6+
* @see com.rabbitmq.client.ConnectionFactory#setTopologyRecovery(boolean)
7+
* @since 3.3.0
8+
*/
9+
public class TopologyRecoveryException extends Exception {
10+
public TopologyRecoveryException(String message, Throwable cause) {
11+
super(message, cause);
12+
}
13+
}

0 commit comments

Comments
 (0)