2121import org .apache .http .HttpHost ;
2222import org .apache .http .auth .AuthScope ;
2323import org .apache .http .auth .UsernamePasswordCredentials ;
24+ import org .apache .http .client .AuthCache ;
2425import org .apache .http .client .ClientProtocolException ;
2526import org .apache .http .client .CredentialsProvider ;
2627import org .apache .http .client .ResponseHandler ;
2930import org .apache .http .client .methods .HttpGet ;
3031import org .apache .http .client .methods .HttpPost ;
3132import org .apache .http .client .methods .HttpPut ;
33+ import org .apache .http .client .protocol .HttpClientContext ;
3234import org .apache .http .client .utils .URIBuilder ;
3335import org .apache .http .entity .StringEntity ;
36+ import org .apache .http .impl .auth .BasicScheme ;
37+ import org .apache .http .impl .client .BasicAuthCache ;
3438import org .apache .http .impl .client .BasicCredentialsProvider ;
3539import org .apache .http .impl .client .CloseableHttpClient ;
3640import org .apache .http .impl .client .HttpClientBuilder ;
4549import org .sourcelab .kafka .connect .apiclient .rest .handlers .RestResponseHandler ;
4650
4751import java .io .IOException ;
52+ import java .net .MalformedURLException ;
4853import java .net .SocketException ;
54+ import java .net .URI ;
4955import java .net .URISyntaxException ;
56+ import java .net .URL ;
5057import java .nio .charset .StandardCharsets ;
5158import java .util .Arrays ;
5259import java .util .Collection ;
@@ -78,6 +85,8 @@ public class HttpClientRestClient implements RestClient {
7885 */
7986 private CloseableHttpClient httpClient ;
8087
88+ private HttpClientContext httpClientContext ;
89+
8190 /**
8291 * Constructor.
8392 */
@@ -109,6 +118,15 @@ public void init(final Configuration configuration) {
109118 // Define our RequestConfigBuilder
110119 final RequestConfig .Builder requestConfigBuilder = RequestConfig .custom ();
111120
121+ // Define our Credentials Provider
122+ final CredentialsProvider credsProvider = new BasicCredentialsProvider ();
123+
124+ // Define our context
125+ httpClientContext = HttpClientContext .create ();
126+
127+ // Define our auth cache
128+ final AuthCache authCache = new BasicAuthCache ();
129+
112130 // If we have a configured proxy host
113131 if (configuration .getProxyHost () != null ) {
114132 // Define proxy host
@@ -120,21 +138,53 @@ public void init(final Configuration configuration) {
120138
121139 // If we have proxy auth enabled
122140 if (configuration .getProxyUsername () != null ) {
123- // Create credential provider
124- final CredentialsProvider credsProvider = new BasicCredentialsProvider ();
141+ // Add proxy credentials
125142 credsProvider .setCredentials (
126143 new AuthScope (configuration .getProxyHost (), configuration .getProxyPort ()),
127144 new UsernamePasswordCredentials (configuration .getProxyUsername (), configuration .getProxyPassword ())
128145 );
129146
130- // Attach Credentials provider to client builder.
131- clientBuilder .setDefaultCredentialsProvider (credsProvider );
147+ // Preemptive load context with authentication.
148+ authCache .put (
149+ new HttpHost (configuration .getProxyHost (), configuration .getProxyPort (), configuration .getProxyScheme ()), new BasicScheme ()
150+ );
132151 }
133152
134153 // Attach Proxy to request config builder
135154 requestConfigBuilder .setProxy (proxyHost );
136155 }
137156
157+ // If BasicAuth credentials are configured.
158+ if (configuration .getBasicAuthUsername () != null ) {
159+ try {
160+ // parse ApiHost for Hostname and port.
161+ final URL apiUrl = new URL (configuration .getApiHost ());
162+
163+ // Add Kafka-Connect credentials
164+ credsProvider .setCredentials (
165+ new AuthScope (apiUrl .getHost (), apiUrl .getPort ()),
166+ new UsernamePasswordCredentials (
167+ configuration .getBasicAuthUsername (),
168+ configuration .getBasicAuthPassword ()
169+ )
170+ );
171+
172+ // Preemptive load context with authentication.
173+ authCache .put (
174+ new HttpHost (apiUrl .getHost (), apiUrl .getPort (), apiUrl .getProtocol ()), new BasicScheme ()
175+ );
176+ } catch (final MalformedURLException exception ) {
177+ throw new RuntimeException (exception .getMessage (), exception );
178+ }
179+ }
180+
181+ // Configure context.
182+ httpClientContext .setAuthCache (authCache );
183+ httpClientContext .setCredentialsProvider (credsProvider );
184+
185+ // Attach Credentials provider to client builder.
186+ clientBuilder .setDefaultCredentialsProvider (credsProvider );
187+
138188 // Attach default request config
139189 clientBuilder .setDefaultRequestConfig (requestConfigBuilder .build ());
140190
@@ -211,7 +261,7 @@ private <T> T submitGetRequest(final String url, final Map<String, String> getPa
211261 logger .debug ("Executing request {}" , get .getRequestLine ());
212262
213263 // Execute and return
214- return httpClient .execute (get , responseHandler );
264+ return httpClient .execute (get , responseHandler , httpClientContext );
215265 } catch (final ClientProtocolException | SocketException | URISyntaxException connectionException ) {
216266 // Typically this is a connection issue.
217267 throw new ConnectionException (connectionException .getMessage (), connectionException );
@@ -244,7 +294,7 @@ private <T> T submitPostRequest(final String url, final Object requestBody, fina
244294 logger .debug ("Executing request {} with {}" , post .getRequestLine (), jsonPayloadStr );
245295
246296 // Execute and return
247- return httpClient .execute (post , responseHandler );
297+ return httpClient .execute (post , responseHandler , httpClientContext );
248298 } catch (final ClientProtocolException | SocketException connectionException ) {
249299 // Typically this is a connection issue.
250300 throw new ConnectionException (connectionException .getMessage (), connectionException );
@@ -276,7 +326,7 @@ private <T> T submitPutRequest(final String url, final Object requestBody, final
276326 logger .debug ("Executing request {} with {}" , put .getRequestLine (), jsonPayloadStr );
277327
278328 // Execute and return
279- return httpClient .execute (put , responseHandler );
329+ return httpClient .execute (put , responseHandler , httpClientContext );
280330 } catch (final ClientProtocolException | SocketException connectionException ) {
281331 // Typically this is a connection issue.
282332 throw new ConnectionException (connectionException .getMessage (), connectionException );
@@ -307,7 +357,7 @@ private <T> T submitDeleteRequest(final String url, final Object requestBody, fi
307357 logger .debug ("Executing request {} with {}" , delete .getRequestLine (), jsonPayloadStr );
308358
309359 // Execute and return
310- return httpClient .execute (delete , responseHandler );
360+ return httpClient .execute (delete , responseHandler , httpClientContext );
311361 } catch (final ClientProtocolException | SocketException connectionException ) {
312362 // Typically this is a connection issue.
313363 throw new ConnectionException (connectionException .getMessage (), connectionException );
0 commit comments