Skip to content

Commit 691ba4e

Browse files
committed
Merge branch 'spatula75-4.4.x-stable' into 4.4.x-stable
2 parents 2db37db + 4644e17 commit 691ba4e

File tree

8 files changed

+195
-61
lines changed

8 files changed

+195
-61
lines changed

src/main/java/com/rabbitmq/client/ConnectionFactory.java

Lines changed: 67 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -15,30 +15,44 @@
1515

1616
package com.rabbitmq.client;
1717

18-
import com.rabbitmq.client.impl.*;
18+
import static java.util.concurrent.TimeUnit.*;
19+
20+
import com.rabbitmq.client.impl.AMQConnection;
21+
import com.rabbitmq.client.impl.ConnectionParams;
22+
import com.rabbitmq.client.impl.CredentialsProvider;
23+
import com.rabbitmq.client.impl.DefaultCredentialsProvider;
24+
import com.rabbitmq.client.impl.DefaultExceptionHandler;
25+
import com.rabbitmq.client.impl.FrameHandler;
26+
import com.rabbitmq.client.impl.FrameHandlerFactory;
27+
import com.rabbitmq.client.impl.SocketFrameHandlerFactory;
1928
import com.rabbitmq.client.impl.nio.NioParams;
2029
import com.rabbitmq.client.impl.nio.SocketChannelFrameHandlerFactory;
2130
import com.rabbitmq.client.impl.recovery.AutorecoveringConnection;
22-
23-
import javax.net.SocketFactory;
24-
import javax.net.ssl.SSLContext;
25-
import javax.net.ssl.SSLSocketFactory;
26-
import javax.net.ssl.TrustManager;
2731
import java.io.IOException;
2832
import java.net.URI;
2933
import java.net.URISyntaxException;
3034
import java.net.URLDecoder;
3135
import java.security.KeyManagementException;
3236
import java.security.NoSuchAlgorithmException;
33-
import java.util.*;
34-
import java.util.concurrent.*;
35-
36-
import static java.util.concurrent.TimeUnit.*;
37+
import java.util.Arrays;
38+
import java.util.Collections;
39+
import java.util.HashMap;
40+
import java.util.List;
41+
import java.util.Map;
42+
import java.util.Properties;
43+
import java.util.concurrent.ExecutorService;
44+
import java.util.concurrent.Executors;
45+
import java.util.concurrent.ScheduledExecutorService;
46+
import java.util.concurrent.ThreadFactory;
47+
import java.util.concurrent.TimeoutException;
48+
import javax.net.SocketFactory;
49+
import javax.net.ssl.SSLContext;
50+
import javax.net.ssl.SSLSocketFactory;
51+
import javax.net.ssl.TrustManager;
3752

3853
/**
3954
* Convenience "factory" class to facilitate opening a {@link Connection} to an AMQP broker.
4055
*/
41-
4256
public class ConnectionFactory implements Cloneable {
4357

4458
/** Default user name */
@@ -85,31 +99,30 @@ public class ConnectionFactory implements Cloneable {
8599

86100
private static final String FALLBACK_TLS_PROTOCOL = "TLSv1";
87101

88-
private String username = DEFAULT_USER;
89-
private String password = DEFAULT_PASS;
90-
private String virtualHost = DEFAULT_VHOST;
91-
private String host = DEFAULT_HOST;
92-
private int port = USE_DEFAULT_PORT;
93-
private int requestedChannelMax = DEFAULT_CHANNEL_MAX;
94-
private int requestedFrameMax = DEFAULT_FRAME_MAX;
95-
private int requestedHeartbeat = DEFAULT_HEARTBEAT;
96-
private int connectionTimeout = DEFAULT_CONNECTION_TIMEOUT;
97-
private int handshakeTimeout = DEFAULT_HANDSHAKE_TIMEOUT;
98-
private int shutdownTimeout = DEFAULT_SHUTDOWN_TIMEOUT;
99-
private Map<String, Object> _clientProperties = AMQConnection.defaultClientProperties();
100-
private SocketFactory factory = SocketFactory.getDefault();
101-
private SaslConfig saslConfig = DefaultSaslConfig.PLAIN;
102+
private String virtualHost = DEFAULT_VHOST;
103+
private String host = DEFAULT_HOST;
104+
private int port = USE_DEFAULT_PORT;
105+
private int requestedChannelMax = DEFAULT_CHANNEL_MAX;
106+
private int requestedFrameMax = DEFAULT_FRAME_MAX;
107+
private int requestedHeartbeat = DEFAULT_HEARTBEAT;
108+
private int connectionTimeout = DEFAULT_CONNECTION_TIMEOUT;
109+
private int handshakeTimeout = DEFAULT_HANDSHAKE_TIMEOUT;
110+
private int shutdownTimeout = DEFAULT_SHUTDOWN_TIMEOUT;
111+
private Map<String, Object> _clientProperties = AMQConnection.defaultClientProperties();
112+
private SocketFactory factory = SocketFactory.getDefault();
113+
private SaslConfig saslConfig = DefaultSaslConfig.PLAIN;
102114
private ExecutorService sharedExecutor;
103-
private ThreadFactory threadFactory = Executors.defaultThreadFactory();
115+
private ThreadFactory threadFactory = Executors.defaultThreadFactory();
104116
// minimises the number of threads rapid closure of many
105117
// connections uses, see rabbitmq/rabbitmq-java-client#86
106118
private ExecutorService shutdownExecutor;
107119
private ScheduledExecutorService heartbeatExecutor;
108-
private SocketConfigurator socketConf = new DefaultSocketConfigurator();
109-
private ExceptionHandler exceptionHandler = new DefaultExceptionHandler();
120+
private SocketConfigurator socketConf = new DefaultSocketConfigurator();
121+
private ExceptionHandler exceptionHandler = new DefaultExceptionHandler();
122+
private CredentialsProvider credentialsProvider = new DefaultCredentialsProvider(DEFAULT_USER, DEFAULT_PASS);
110123

111-
private boolean automaticRecovery = true;
112-
private boolean topologyRecovery = true;
124+
private boolean automaticRecovery = true;
125+
private boolean topologyRecovery = true;
113126

114127
// long is used to make sure the users can use both ints
115128
// and longs safely. It is unlikely that anybody'd need
@@ -172,33 +185,50 @@ public void setPort(int port) {
172185
* @return the AMQP user name to use when connecting to the broker
173186
*/
174187
public String getUsername() {
175-
return this.username;
188+
return credentialsProvider.getUsername();
176189
}
177190

178191
/**
179192
* Set the user name.
180193
* @param username the AMQP user name to use when connecting to the broker
181194
*/
182195
public void setUsername(String username) {
183-
this.username = username;
196+
this.credentialsProvider = new DefaultCredentialsProvider(
197+
username,
198+
this.credentialsProvider.getPassword()
199+
);
184200
}
185201

186202
/**
187203
* Retrieve the password.
188204
* @return the password to use when connecting to the broker
189205
*/
190206
public String getPassword() {
191-
return this.password;
207+
return credentialsProvider.getPassword();
192208
}
193209

194210
/**
195211
* Set the password.
196212
* @param password the password to use when connecting to the broker
197213
*/
198214
public void setPassword(String password) {
199-
this.password = password;
215+
this.credentialsProvider = new DefaultCredentialsProvider(
216+
this.credentialsProvider.getUsername(),
217+
password
218+
);
200219
}
201220

221+
/**
222+
* Set a custom credentials provider.
223+
* Default implementation uses static username and password.
224+
* @param credentialsProvider The custom implementation of CredentialsProvider to use when connecting to the broker.
225+
* @see com.rabbitmq.client.impl.DefaultCredentialsProvider
226+
* @since 4.5.0
227+
*/
228+
public void setCredentialsProvider(CredentialsProvider credentialsProvider) {
229+
this.credentialsProvider = credentialsProvider;
230+
}
231+
202232
/**
203233
* Retrieve the virtual host.
204234
* @return the virtual host to use when connecting to the broker
@@ -954,8 +984,7 @@ public Connection newConnection(ExecutorService executor, AddressResolver addres
954984
public ConnectionParams params(ExecutorService consumerWorkServiceExecutor) {
955985
ConnectionParams result = new ConnectionParams();
956986

957-
result.setUsername(username);
958-
result.setPassword(password);
987+
result.setCredentialsProvider(credentialsProvider);
959988
result.setConsumerWorkServiceExecutor(consumerWorkServiceExecutor);
960989
result.setVirtualHost(virtualHost);
961990
result.setClientProperties(getClientProperties());
@@ -1053,7 +1082,8 @@ protected AddressResolver createAddressResolver(List<Address> addresses) {
10531082

10541083
@Override public ConnectionFactory clone(){
10551084
try {
1056-
return (ConnectionFactory)super.clone();
1085+
ConnectionFactory clone = (ConnectionFactory)super.clone();
1086+
return clone;
10571087
} catch (CloneNotSupportedException e) {
10581088
throw new Error(e);
10591089
}

src/main/java/com/rabbitmq/client/impl/AMQConnection.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -129,8 +129,7 @@ public static Map<String, Object> defaultClientProperties() {
129129
private final int requestedFrameMax;
130130
private final int handshakeTimeout;
131131
private final int shutdownTimeout;
132-
private final String username;
133-
private final String password;
132+
private final CredentialsProvider credentialsProvider;
134133
private final Collection<BlockedListener> blockedListeners = new CopyOnWriteArrayList<BlockedListener>();
135134
protected final MetricsCollector metricsCollector;
136135
private final int channelRpcTimeout;
@@ -209,8 +208,7 @@ public AMQConnection(ConnectionParams params, FrameHandler frameHandler) {
209208
public AMQConnection(ConnectionParams params, FrameHandler frameHandler, MetricsCollector metricsCollector)
210209
{
211210
checkPreconditions();
212-
this.username = params.getUsername();
213-
this.password = params.getPassword();
211+
this.credentialsProvider = params.getCredentialsProvider();
214212
this._frameHandler = frameHandler;
215213
this._virtualHost = params.getVirtualHost();
216214
this._exceptionHandler = params.getExceptionHandler();
@@ -323,8 +321,10 @@ public void start()
323321
"server offered [" + connStart.getMechanisms() + "]");
324322
}
325323

324+
String username = credentialsProvider.getUsername();
325+
String password = credentialsProvider.getPassword();
326326
LongString challenge = null;
327-
LongString response = sm.handleChallenge(null, this.username, this.password);
327+
LongString response = sm.handleChallenge(null, username, password);
328328

329329
do {
330330
Method method = (challenge == null)
@@ -341,7 +341,7 @@ public void start()
341341
connTune = (AMQP.Connection.Tune) serverResponse;
342342
} else {
343343
challenge = ((AMQP.Connection.Secure) serverResponse).getChallenge();
344-
response = sm.handleChallenge(challenge, this.username, this.password);
344+
response = sm.handleChallenge(challenge, username, password);
345345
}
346346
} catch (ShutdownSignalException e) {
347347
Method shutdownMethod = e.getReason();
@@ -1021,7 +1021,7 @@ public AMQCommand transformReply(AMQCommand command) {
10211021

10221022
@Override public String toString() {
10231023
final String virtualHost = "/".equals(_virtualHost) ? _virtualHost : "/" + _virtualHost;
1024-
return "amqp://" + this.username + "@" + getHostAddress() + ":" + getPort() + virtualHost;
1024+
return "amqp://" + this.credentialsProvider.getUsername() + "@" + getHostAddress() + ":" + getPort() + virtualHost;
10251025
}
10261026

10271027
private String getHostAddress() {

src/main/java/com/rabbitmq/client/impl/ConnectionParams.java

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,7 @@
2626
import java.util.concurrent.ThreadFactory;
2727

2828
public class ConnectionParams {
29-
private String username;
30-
private String password;
29+
private CredentialsProvider credentialsProvider;
3130
private ExecutorService consumerWorkServiceExecutor;
3231
private ScheduledExecutorService heartbeatExecutor;
3332
private ExecutorService shutdownExecutor;
@@ -50,12 +49,8 @@ public class ConnectionParams {
5049

5150
public ConnectionParams() {}
5251

53-
public String getUsername() {
54-
return username;
55-
}
56-
57-
public String getPassword() {
58-
return password;
52+
public CredentialsProvider getCredentialsProvider() {
53+
return credentialsProvider;
5954
}
6055

6156
public ExecutorService getConsumerWorkServiceExecutor() {
@@ -130,12 +125,8 @@ public boolean channelShouldCheckRpcResponseType() {
130125
return channelShouldCheckRpcResponseType;
131126
}
132127

133-
public void setUsername(String username) {
134-
this.username = username;
135-
}
136-
137-
public void setPassword(String password) {
138-
this.password = password;
128+
public void setCredentialsProvider(CredentialsProvider credentialsProvider) {
129+
this.credentialsProvider = credentialsProvider;
139130
}
140131

141132
public void setConsumerWorkServiceExecutor(ExecutorService consumerWorkServiceExecutor) {
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package com.rabbitmq.client.impl;
2+
3+
/**
4+
* Provider interface for establishing credentials for connecting to the broker. Especially useful
5+
* for situations where credentials might change before a recovery takes place or where it is
6+
* convenient to plug in an outside custom implementation.
7+
*
8+
* @since 4.5.0
9+
*/
10+
public interface CredentialsProvider {
11+
12+
String getUsername();
13+
14+
String getPassword();
15+
16+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package com.rabbitmq.client.impl;
2+
3+
/**
4+
* Default implementation of a CredentialsProvider which simply holds a static
5+
* username and password.
6+
*
7+
* @since 4.5.0
8+
*/
9+
public class DefaultCredentialsProvider implements CredentialsProvider {
10+
11+
private final String username;
12+
private final String password;
13+
14+
public DefaultCredentialsProvider(String username, String password) {
15+
this.username = username;
16+
this.password = password;
17+
}
18+
19+
@Override
20+
public String getUsername() {
21+
return username;
22+
}
23+
24+
@Override
25+
public String getPassword() {
26+
return password;
27+
}
28+
29+
}

src/test/java/com/rabbitmq/client/test/ConnectionFactoryTest.java

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.rabbitmq.client.MetricsCollector;
2222
import com.rabbitmq.client.impl.AMQConnection;
2323
import com.rabbitmq.client.impl.ConnectionParams;
24+
import com.rabbitmq.client.impl.CredentialsProvider;
2425
import com.rabbitmq.client.impl.FrameHandler;
2526
import com.rabbitmq.client.impl.FrameHandlerFactory;
2627
import org.junit.Test;
@@ -29,8 +30,9 @@
2930
import java.util.Queue;
3031
import java.util.concurrent.ArrayBlockingQueue;
3132
import java.util.concurrent.TimeoutException;
33+
import java.util.concurrent.atomic.AtomicBoolean;
3234

33-
import static org.junit.Assert.assertSame;
35+
import static org.junit.Assert.*;
3436
import static org.mockito.Mockito.*;
3537

3638
public class ConnectionFactoryTest {
@@ -62,5 +64,30 @@ protected synchronized FrameHandlerFactory createFrameHandlerFactory() throws IO
6264
);
6365
assertSame(connectionThatSucceeds, returnedConnection);
6466
}
67+
68+
// see https://github.com/rabbitmq/rabbitmq-java-client/pull/350
69+
@Test public void customizeCredentialsProvider() throws Exception {
70+
final CredentialsProvider provider = mock(CredentialsProvider.class);
71+
final AMQConnection connection = mock(AMQConnection.class);
72+
final AtomicBoolean createCalled = new AtomicBoolean(false);
73+
74+
ConnectionFactory connectionFactory = new ConnectionFactory() {
75+
@Override
76+
protected AMQConnection createConnection(ConnectionParams params, FrameHandler frameHandler,
77+
MetricsCollector metricsCollector) {
78+
assertSame(provider, params.getCredentialsProvider());
79+
createCalled.set(true);
80+
return connection;
81+
}
82+
};
83+
connectionFactory.setCredentialsProvider(provider);
84+
connectionFactory.setAutomaticRecoveryEnabled(false);
85+
86+
doNothing().when(connection).start();
87+
88+
Connection returnedConnection = connectionFactory.newConnection();
89+
assertSame(returnedConnection, connection);
90+
assertTrue(createCalled.get());
91+
}
6592

6693
}

0 commit comments

Comments
 (0)