Skip to content

Commit cdee068

Browse files
committed
Merge branch '4.x.x-stable' into 5.1.x-stable
Conflicts: src/main/java/com/rabbitmq/client/ConnectionFactory.java
2 parents 6481c0b + 7cd22de commit cdee068

File tree

8 files changed

+186
-50
lines changed

8 files changed

+186
-50
lines changed

src/main/java/com/rabbitmq/client/ConnectionFactory.java

Lines changed: 58 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -15,25 +15,41 @@
1515

1616
package com.rabbitmq.client;
1717

18-
import com.rabbitmq.client.impl.*;
18+
import static java.util.concurrent.TimeUnit.*;
19+
20+
import com.rabbitmq.client.impl.AMQConnection;
21+
import com.rabbitmq.client.impl.ConnectionParams;
22+
import com.rabbitmq.client.impl.CredentialsProvider;
23+
import com.rabbitmq.client.impl.DefaultCredentialsProvider;
24+
import com.rabbitmq.client.impl.DefaultExceptionHandler;
25+
import com.rabbitmq.client.impl.ErrorOnWriteListener;
26+
import com.rabbitmq.client.impl.FrameHandler;
27+
import com.rabbitmq.client.impl.FrameHandlerFactory;
28+
import com.rabbitmq.client.impl.SocketFrameHandlerFactory;
1929
import com.rabbitmq.client.impl.nio.NioParams;
2030
import com.rabbitmq.client.impl.nio.SocketChannelFrameHandlerFactory;
2131
import com.rabbitmq.client.impl.recovery.AutorecoveringConnection;
22-
23-
import javax.net.SocketFactory;
24-
import javax.net.ssl.SSLContext;
25-
import javax.net.ssl.SSLSocketFactory;
26-
import javax.net.ssl.TrustManager;
2732
import java.io.IOException;
2833
import java.net.URI;
2934
import java.net.URISyntaxException;
3035
import java.net.URLDecoder;
3136
import java.security.KeyManagementException;
3237
import java.security.NoSuchAlgorithmException;
33-
import java.util.*;
34-
import java.util.concurrent.*;
35-
36-
import static java.util.concurrent.TimeUnit.*;
38+
import java.util.Arrays;
39+
import java.util.Collections;
40+
import java.util.HashMap;
41+
import java.util.List;
42+
import java.util.Map;
43+
import java.util.Properties;
44+
import java.util.concurrent.ExecutorService;
45+
import java.util.concurrent.Executors;
46+
import java.util.concurrent.ScheduledExecutorService;
47+
import java.util.concurrent.ThreadFactory;
48+
import java.util.concurrent.TimeoutException;
49+
import javax.net.SocketFactory;
50+
import javax.net.ssl.SSLContext;
51+
import javax.net.ssl.SSLSocketFactory;
52+
import javax.net.ssl.TrustManager;
3753

3854
/**
3955
* Convenience factory class to facilitate opening a {@link Connection} to a RabbitMQ node.
@@ -42,7 +58,6 @@
4258
* Some settings that apply to connections can also be configured here
4359
* and will apply to all connections produced by this factory.
4460
*/
45-
4661
public class ConnectionFactory implements Cloneable {
4762

4863
/** Default user name */
@@ -92,8 +107,6 @@ public class ConnectionFactory implements Cloneable {
92107

93108
private static final String FALLBACK_TLS_PROTOCOL = "TLSv1";
94109

95-
private String username = DEFAULT_USER;
96-
private String password = DEFAULT_PASS;
97110
private String virtualHost = DEFAULT_VHOST;
98111
private String host = DEFAULT_HOST;
99112
private int port = USE_DEFAULT_PORT;
@@ -106,17 +119,19 @@ public class ConnectionFactory implements Cloneable {
106119
private Map<String, Object> _clientProperties = AMQConnection.defaultClientProperties();
107120
private SocketFactory socketFactory = null;
108121
private SaslConfig saslConfig = DefaultSaslConfig.PLAIN;
122+
109123
private ExecutorService sharedExecutor;
110-
private ThreadFactory threadFactory = Executors.defaultThreadFactory();
124+
private ThreadFactory threadFactory = Executors.defaultThreadFactory();
111125
// minimises the number of threads rapid closure of many
112126
// connections uses, see rabbitmq/rabbitmq-java-client#86
113127
private ExecutorService shutdownExecutor;
114128
private ScheduledExecutorService heartbeatExecutor;
115-
private SocketConfigurator socketConf = new DefaultSocketConfigurator();
116-
private ExceptionHandler exceptionHandler = new DefaultExceptionHandler();
129+
private SocketConfigurator socketConf = new DefaultSocketConfigurator();
130+
private ExceptionHandler exceptionHandler = new DefaultExceptionHandler();
131+
private CredentialsProvider credentialsProvider = new DefaultCredentialsProvider(DEFAULT_USER, DEFAULT_PASS);
117132

118-
private boolean automaticRecovery = true;
119-
private boolean topologyRecovery = true;
133+
private boolean automaticRecovery = true;
134+
private boolean topologyRecovery = true;
120135

121136
// long is used to make sure the users can use both ints
122137
// and longs safely. It is unlikely that anybody'd need
@@ -193,33 +208,50 @@ public void setPort(int port) {
193208
* @return the AMQP user name to use when connecting to the broker
194209
*/
195210
public String getUsername() {
196-
return this.username;
211+
return credentialsProvider.getUsername();
197212
}
198213

199214
/**
200215
* Set the user name.
201216
* @param username the AMQP user name to use when connecting to the broker
202217
*/
203218
public void setUsername(String username) {
204-
this.username = username;
219+
this.credentialsProvider = new DefaultCredentialsProvider(
220+
username,
221+
this.credentialsProvider.getPassword()
222+
);
205223
}
206224

207225
/**
208226
* Retrieve the password.
209227
* @return the password to use when connecting to the broker
210228
*/
211229
public String getPassword() {
212-
return this.password;
230+
return credentialsProvider.getPassword();
213231
}
214232

215233
/**
216234
* Set the password.
217235
* @param password the password to use when connecting to the broker
218236
*/
219237
public void setPassword(String password) {
220-
this.password = password;
238+
this.credentialsProvider = new DefaultCredentialsProvider(
239+
this.credentialsProvider.getUsername(),
240+
password
241+
);
221242
}
222243

244+
/**
245+
* Set a custom credentials provider.
246+
* Default implementation uses static username and password.
247+
* @param credentialsProvider The custom implementation of CredentialsProvider to use when connecting to the broker.
248+
* @see com.rabbitmq.client.impl.DefaultCredentialsProvider
249+
* @since 4.5.0
250+
*/
251+
public void setCredentialsProvider(CredentialsProvider credentialsProvider) {
252+
this.credentialsProvider = credentialsProvider;
253+
}
254+
223255
/**
224256
* Retrieve the virtual host.
225257
* @return the virtual host to use when connecting to the broker
@@ -994,8 +1026,7 @@ public Connection newConnection(ExecutorService executor, AddressResolver addres
9941026
public ConnectionParams params(ExecutorService consumerWorkServiceExecutor) {
9951027
ConnectionParams result = new ConnectionParams();
9961028

997-
result.setUsername(username);
998-
result.setPassword(password);
1029+
result.setCredentialsProvider(credentialsProvider);
9991030
result.setConsumerWorkServiceExecutor(consumerWorkServiceExecutor);
10001031
result.setVirtualHost(virtualHost);
10011032
result.setClientProperties(getClientProperties());
@@ -1095,7 +1126,8 @@ protected AddressResolver createAddressResolver(List<Address> addresses) {
10951126

10961127
@Override public ConnectionFactory clone(){
10971128
try {
1098-
return (ConnectionFactory)super.clone();
1129+
ConnectionFactory clone = (ConnectionFactory)super.clone();
1130+
return clone;
10991131
} catch (CloneNotSupportedException e) {
11001132
throw new RuntimeException(e);
11011133
}
@@ -1331,7 +1363,7 @@ public boolean isChannelShouldCheckRpcResponseType() {
13311363

13321364
/**
13331365
* Timeout (in ms) for work pool enqueueing.
1334-
* The {@link WorkPool} dispatches several types of responses
1366+
* The {@link com.rabbitmq.client.impl.WorkPool} dispatches several types of responses
13351367
* from the broker (e.g. deliveries). A high-traffic
13361368
* client with slow consumers can exhaust the work pool and
13371369
* compromise the whole connection (by e.g. letting the broker

src/main/java/com/rabbitmq/client/impl/AMQConnection.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -136,8 +136,7 @@ public static Map<String, Object> defaultClientProperties() {
136136
private final int requestedFrameMax;
137137
private final int handshakeTimeout;
138138
private final int shutdownTimeout;
139-
private final String username;
140-
private final String password;
139+
private final CredentialsProvider credentialsProvider;
141140
private final Collection<BlockedListener> blockedListeners = new CopyOnWriteArrayList<BlockedListener>();
142141
protected final MetricsCollector metricsCollector;
143142
private final int channelRpcTimeout;
@@ -216,8 +215,7 @@ public AMQConnection(ConnectionParams params, FrameHandler frameHandler) {
216215
public AMQConnection(ConnectionParams params, FrameHandler frameHandler, MetricsCollector metricsCollector)
217216
{
218217
checkPreconditions();
219-
this.username = params.getUsername();
220-
this.password = params.getPassword();
218+
this.credentialsProvider = params.getCredentialsProvider();
221219
this._frameHandler = frameHandler;
222220
this._virtualHost = params.getVirtualHost();
223221
this._exceptionHandler = params.getExceptionHandler();
@@ -337,8 +335,10 @@ public void start()
337335
"server offered [" + connStart.getMechanisms() + "]");
338336
}
339337

338+
String username = credentialsProvider.getUsername();
339+
String password = credentialsProvider.getPassword();
340340
LongString challenge = null;
341-
LongString response = sm.handleChallenge(null, this.username, this.password);
341+
LongString response = sm.handleChallenge(null, username, password);
342342

343343
do {
344344
Method method = (challenge == null)
@@ -355,7 +355,7 @@ public void start()
355355
connTune = (AMQP.Connection.Tune) serverResponse;
356356
} else {
357357
challenge = ((AMQP.Connection.Secure) serverResponse).getChallenge();
358-
response = sm.handleChallenge(challenge, this.username, this.password);
358+
response = sm.handleChallenge(challenge, username, password);
359359
}
360360
} catch (ShutdownSignalException e) {
361361
Method shutdownMethod = e.getReason();
@@ -1069,7 +1069,7 @@ public AMQCommand transformReply(AMQCommand command) {
10691069

10701070
@Override public String toString() {
10711071
final String virtualHost = "/".equals(_virtualHost) ? _virtualHost : "/" + _virtualHost;
1072-
return "amqp://" + this.username + "@" + getHostAddress() + ":" + getPort() + virtualHost;
1072+
return "amqp://" + this.credentialsProvider.getUsername() + "@" + getHostAddress() + ":" + getPort() + virtualHost;
10731073
}
10741074

10751075
private String getHostAddress() {

src/main/java/com/rabbitmq/client/impl/ConnectionParams.java

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,7 @@
2626
import java.util.concurrent.ThreadFactory;
2727

2828
public class ConnectionParams {
29-
private String username;
30-
private String password;
29+
private CredentialsProvider credentialsProvider;
3130
private ExecutorService consumerWorkServiceExecutor;
3231
private ScheduledExecutorService heartbeatExecutor;
3332
private ExecutorService shutdownExecutor;
@@ -52,12 +51,8 @@ public class ConnectionParams {
5251

5352
public ConnectionParams() {}
5453

55-
public String getUsername() {
56-
return username;
57-
}
58-
59-
public String getPassword() {
60-
return password;
54+
public CredentialsProvider getCredentialsProvider() {
55+
return credentialsProvider;
6156
}
6257

6358
public ExecutorService getConsumerWorkServiceExecutor() {
@@ -132,12 +127,8 @@ public boolean channelShouldCheckRpcResponseType() {
132127
return channelShouldCheckRpcResponseType;
133128
}
134129

135-
public void setUsername(String username) {
136-
this.username = username;
137-
}
138-
139-
public void setPassword(String password) {
140-
this.password = password;
130+
public void setCredentialsProvider(CredentialsProvider credentialsProvider) {
131+
this.credentialsProvider = credentialsProvider;
141132
}
142133

143134
public void setConsumerWorkServiceExecutor(ExecutorService consumerWorkServiceExecutor) {
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package com.rabbitmq.client.impl;
2+
3+
/**
4+
* Provider interface for establishing credentials for connecting to the broker. Especially useful
5+
* for situations where credentials might change before a recovery takes place or where it is
6+
* convenient to plug in an outside custom implementation.
7+
*
8+
* @since 4.5.0
9+
*/
10+
public interface CredentialsProvider {
11+
12+
String getUsername();
13+
14+
String getPassword();
15+
16+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package com.rabbitmq.client.impl;
2+
3+
/**
4+
* Default implementation of a CredentialsProvider which simply holds a static
5+
* username and password.
6+
*
7+
* @since 4.5.0
8+
*/
9+
public class DefaultCredentialsProvider implements CredentialsProvider {
10+
11+
private final String username;
12+
private final String password;
13+
14+
public DefaultCredentialsProvider(String username, String password) {
15+
this.username = username;
16+
this.password = password;
17+
}
18+
19+
@Override
20+
public String getUsername() {
21+
return username;
22+
}
23+
24+
@Override
25+
public String getPassword() {
26+
return password;
27+
}
28+
29+
}

src/test/java/com/rabbitmq/client/test/ConnectionFactoryTest.java

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.rabbitmq.client.MetricsCollector;
2222
import com.rabbitmq.client.impl.AMQConnection;
2323
import com.rabbitmq.client.impl.ConnectionParams;
24+
import com.rabbitmq.client.impl.CredentialsProvider;
2425
import com.rabbitmq.client.impl.FrameHandler;
2526
import com.rabbitmq.client.impl.FrameHandlerFactory;
2627
import org.junit.Test;
@@ -29,8 +30,9 @@
2930
import java.util.Queue;
3031
import java.util.concurrent.ArrayBlockingQueue;
3132
import java.util.concurrent.TimeoutException;
33+
import java.util.concurrent.atomic.AtomicBoolean;
3234

33-
import static org.junit.Assert.assertSame;
35+
import static org.junit.Assert.*;
3436
import static org.mockito.Mockito.*;
3537

3638
public class ConnectionFactoryTest {
@@ -62,5 +64,30 @@ protected synchronized FrameHandlerFactory createFrameHandlerFactory() throws IO
6264
);
6365
assertSame(connectionThatSucceeds, returnedConnection);
6466
}
67+
68+
// see https://github.com/rabbitmq/rabbitmq-java-client/pull/350
69+
@Test public void customizeCredentialsProvider() throws Exception {
70+
final CredentialsProvider provider = mock(CredentialsProvider.class);
71+
final AMQConnection connection = mock(AMQConnection.class);
72+
final AtomicBoolean createCalled = new AtomicBoolean(false);
73+
74+
ConnectionFactory connectionFactory = new ConnectionFactory() {
75+
@Override
76+
protected AMQConnection createConnection(ConnectionParams params, FrameHandler frameHandler,
77+
MetricsCollector metricsCollector) {
78+
assertSame(provider, params.getCredentialsProvider());
79+
createCalled.set(true);
80+
return connection;
81+
}
82+
};
83+
connectionFactory.setCredentialsProvider(provider);
84+
connectionFactory.setAutomaticRecoveryEnabled(false);
85+
86+
doNothing().when(connection).start();
87+
88+
Connection returnedConnection = connectionFactory.newConnection();
89+
assertSame(returnedConnection, connection);
90+
assertTrue(createCalled.get());
91+
}
6592

6693
}

0 commit comments

Comments
 (0)