5151import com .rabbitmq .utility .BlockingCell ;
5252import com .rabbitmq .utility .Utility ;
5353
54+ import javax .security .sasl .SaslClient ;
55+
5456/**
5557 * Concrete class representing and managing an AMQP connection to a broker.
5658 * <p>
@@ -152,7 +154,7 @@ public void ensureIsOpen()
152154 */
153155 private int _heartbeat ;
154156
155- private final String _username , _password , _virtualHost ;
157+ private final String _virtualHost ;
156158 private final int _requestedChannelMax , _requestedFrameMax , _requestedHeartbeat ;
157159 private final Map <String , Object > _clientProperties ;
158160
@@ -200,8 +202,6 @@ public AMQConnection(ConnectionFactory factory,
200202 {
201203 checkPreconditions ();
202204
203- _username = factory .getUsername ();
204- _password = factory .getPassword ();
205205 _virtualHost = factory .getVirtualHost ();
206206 _requestedChannelMax = factory .getRequestedChannelMax ();
207207 _requestedFrameMax = factory .getRequestedFrameMax ();
@@ -255,8 +255,9 @@ public void start()
255255 ml .setName ("AMQP Connection " + getHost () + ":" + getPort ());
256256 ml .start ();
257257
258+ AMQP .Connection .Start connStart = null ;
258259 try {
259- AMQP . Connection . Start connStart =
260+ connStart =
260261 (AMQP .Connection .Start ) connStartBlocker .getReply ().getMethod ();
261262
262263 _serverProperties = connStart .getServerProperties ();
@@ -274,18 +275,42 @@ public void start()
274275 throw AMQChannel .wrap (sse );
275276 }
276277
277- LongString saslResponse = LongStringHelper .asLongString ("\0 " + _username +
278- "\0 " + _password );
279- AMQImpl .Connection .StartOk startOk =
280- new AMQImpl .Connection .StartOk (_clientProperties , "PLAIN" ,
281- saslResponse , "en_US" );
278+ String [] mechanisms = connStart .getMechanisms ().toString ().split (" " );
279+ SaslClient sc = _factory .getSaslConfig ().getSaslClient (mechanisms );
280+ if (sc == null ) {
281+ throw new IOException ("No compatible authentication mechanism found - " +
282+ "server offered [" + connStart .getMechanisms () + "]" );
283+ }
282284
285+ LongString challenge = null ;
286+ LongString response = LongStringHelper .asLongString (
287+ sc .hasInitialResponse () ? sc .evaluateChallenge (new byte [0 ]) : null );
283288 AMQP .Connection .Tune connTune = null ;
289+ do {
290+ Method method = (challenge == null )
291+ ? new AMQImpl .Connection .StartOk (_clientProperties ,
292+ sc .getMechanismName (),
293+ response , "en_US" )
294+ : new AMQImpl .Connection .SecureOk (response );
284295
285- try {
286- connTune = (AMQP .Connection .Tune ) _channel0 .rpc (startOk ).getMethod ();
287- } catch (ShutdownSignalException e ) {
288- throw new PossibleAuthenticationFailureException (e );
296+ try {
297+ Method serverResponse = _channel0 .rpc (method ).getMethod ();
298+ if (serverResponse instanceof AMQP .Connection .Tune ) {
299+ connTune = (AMQP .Connection .Tune ) serverResponse ;
300+ } else {
301+ challenge = ((AMQP .Connection .Secure ) serverResponse ).getChallenge ();
302+ response = LongStringHelper .asLongString (sc .evaluateChallenge (challenge .getBytes ()));
303+ }
304+ } catch (ShutdownSignalException e ) {
305+ throw new PossibleAuthenticationFailureException (e );
306+ }
307+ } while (connTune == null );
308+
309+ sc .dispose ();
310+
311+ if (!sc .isComplete ()) {
312+ throw new RuntimeException (sc .getMechanismName () +
313+ " did not complete, server thought it did" );
289314 }
290315
291316 int channelMax =
@@ -714,6 +739,6 @@ public void close(int closeCode,
714739 }
715740
716741 @ Override public String toString () {
717- return "amqp://" + _username + "@" + getHost () + ":" + getPort () + _virtualHost ;
742+ return "amqp://" + _factory . getUsername () + "@" + getHost () + ":" + getPort () + _virtualHost ;
718743 }
719744}
0 commit comments