Skip to content

Commit e05faf3

Browse files
authored
Merge pull request #17 from SourceLabOrg/sp/basicAuth
Add BasicAuth configuration to client
2 parents a16dc05 + c35b144 commit e05faf3

File tree

9 files changed

+213
-9
lines changed

9 files changed

+213
-9
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
44

55
## 1.2.0 (UNRELEASED)
66

7+
### New Features
8+
- Added ability to authenticate with Kafka-Connect REST endpoints utilizing Basic-Authentication.
79

810
## 1.1.0 (01/30/2019)
911

pom.xml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,15 @@
129129
<scope>test</scope>
130130
</dependency>
131131

132+
<!-- Mocking in Tests -->
133+
<dependency>
134+
<groupId>org.mockito</groupId>
135+
<artifactId>mockito-core</artifactId>
136+
<version>2.23.4</version>
137+
<scope>test</scope>
138+
</dependency>
139+
140+
132141
<!-- Data providers on tests -->
133142
<dependency>
134143
<groupId>com.tngtech.java</groupId>

src/main/java/org/sourcelab/kafka/connect/apiclient/Configuration.java

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,13 @@ public final class Configuration {
2929
// Defines the URL/Hostname of Kafka-Connect
3030
private final String apiHost;
3131

32-
// Optional Connection settings
32+
// Optional Connection options
3333
private int requestTimeoutInSeconds = 300;
3434

35+
// Optional BasicAuth options
36+
private String basicAuthUsername = null;
37+
private String basicAuthPassword = null;
38+
3539
// Optional SSL options
3640
private boolean ignoreInvalidSslCertificates = false;
3741
private File trustStoreFile = null;
@@ -64,6 +68,19 @@ public Configuration(final String kafkaConnectHost) {
6468
}
6569
}
6670

71+
/**
72+
* Allow setting http Basic-Authentication username and password to authenticate requests.
73+
*
74+
* @param username username to authenticate requests to Kafka-Connect with.
75+
* @param password password to authenticate requests to Kafka-Connect with.
76+
* @return Configuration instance.
77+
*/
78+
public Configuration useBasicAuth(final String username, final String password) {
79+
this.basicAuthUsername = username;
80+
this.basicAuthPassword = password;
81+
return this;
82+
}
83+
6784
/**
6885
* Allow setting optional proxy configuration.
6986
*
@@ -168,6 +185,14 @@ public int getRequestTimeoutInSeconds() {
168185
return requestTimeoutInSeconds;
169186
}
170187

188+
public String getBasicAuthUsername() {
189+
return basicAuthUsername;
190+
}
191+
192+
public String getBasicAuthPassword() {
193+
return basicAuthPassword;
194+
}
195+
171196
@Override
172197
public String toString() {
173198
final StringBuilder stringBuilder = new StringBuilder("Configuration{")
@@ -191,6 +216,11 @@ public String toString() {
191216
stringBuilder.append(", sslTrustStorePassword='******'");
192217
}
193218
}
219+
if (basicAuthUsername != null) {
220+
stringBuilder
221+
.append(", basicAuthUsername='").append(basicAuthUsername).append('\'')
222+
.append(", basicAuthPassword='******'");
223+
}
194224
stringBuilder.append('}');
195225

196226
return stringBuilder.toString();

src/main/java/org/sourcelab/kafka/connect/apiclient/KafkaConnectClient.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.sourcelab.kafka.connect.apiclient;
1919

20+
import org.apache.http.HttpStatus;
2021
import org.slf4j.Logger;
2122
import org.slf4j.LoggerFactory;
2223
import org.sourcelab.kafka.connect.apiclient.request.JacksonFactory;
@@ -49,6 +50,7 @@
4950
import org.sourcelab.kafka.connect.apiclient.rest.RestClient;
5051
import org.sourcelab.kafka.connect.apiclient.rest.RestResponse;
5152
import org.sourcelab.kafka.connect.apiclient.rest.exceptions.InvalidRequestException;
53+
import org.sourcelab.kafka.connect.apiclient.rest.exceptions.UnauthorizedRequestException;
5254

5355
import java.io.IOException;
5456
import java.util.Collection;
@@ -289,6 +291,19 @@ private <T> T submitRequest(final Request<T> request) {
289291
}
290292
}
291293

294+
// Server reject's client's authentication.
295+
if (responseCode == HttpStatus.SC_UNAUTHORIZED) {
296+
// Throw contextual error msg based on if credentials are configured or not.
297+
String errorMsg;
298+
if (configuration.getBasicAuthUsername() == null) {
299+
errorMsg = "Server required authentication credentials but none were provided in client configuration.";
300+
} else {
301+
errorMsg = "Client authentication credentials (username=" + configuration.getBasicAuthUsername() + ") was rejected by server.";
302+
}
303+
errorMsg = errorMsg + " Server responded with: \"" + responseStr + "\"";
304+
throw new UnauthorizedRequestException(errorMsg, responseCode);
305+
}
306+
292307
// Attempt to parse error response
293308
try {
294309
final RequestErrorResponse errorResponse = JacksonFactory.newInstance().readValue(responseStr, RequestErrorResponse.class);

src/main/java/org/sourcelab/kafka/connect/apiclient/rest/HttpClientRestClient.java

Lines changed: 57 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.http.HttpHost;
2222
import org.apache.http.auth.AuthScope;
2323
import org.apache.http.auth.UsernamePasswordCredentials;
24+
import org.apache.http.client.AuthCache;
2425
import org.apache.http.client.ClientProtocolException;
2526
import org.apache.http.client.CredentialsProvider;
2627
import org.apache.http.client.ResponseHandler;
@@ -29,8 +30,11 @@
2930
import org.apache.http.client.methods.HttpGet;
3031
import org.apache.http.client.methods.HttpPost;
3132
import org.apache.http.client.methods.HttpPut;
33+
import org.apache.http.client.protocol.HttpClientContext;
3234
import org.apache.http.client.utils.URIBuilder;
3335
import org.apache.http.entity.StringEntity;
36+
import org.apache.http.impl.auth.BasicScheme;
37+
import org.apache.http.impl.client.BasicAuthCache;
3438
import org.apache.http.impl.client.BasicCredentialsProvider;
3539
import org.apache.http.impl.client.CloseableHttpClient;
3640
import org.apache.http.impl.client.HttpClientBuilder;
@@ -45,8 +49,10 @@
4549
import org.sourcelab.kafka.connect.apiclient.rest.handlers.RestResponseHandler;
4650

4751
import java.io.IOException;
52+
import java.net.MalformedURLException;
4853
import java.net.SocketException;
4954
import java.net.URISyntaxException;
55+
import java.net.URL;
5056
import java.nio.charset.StandardCharsets;
5157
import java.util.Arrays;
5258
import java.util.Collection;
@@ -78,6 +84,8 @@ public class HttpClientRestClient implements RestClient {
7884
*/
7985
private CloseableHttpClient httpClient;
8086

87+
private HttpClientContext httpClientContext;
88+
8189
/**
8290
* Constructor.
8391
*/
@@ -109,6 +117,15 @@ public void init(final Configuration configuration) {
109117
// Define our RequestConfigBuilder
110118
final RequestConfig.Builder requestConfigBuilder = RequestConfig.custom();
111119

120+
// Define our Credentials Provider
121+
final CredentialsProvider credsProvider = new BasicCredentialsProvider();
122+
123+
// Define our context
124+
httpClientContext = HttpClientContext.create();
125+
126+
// Define our auth cache
127+
final AuthCache authCache = new BasicAuthCache();
128+
112129
// If we have a configured proxy host
113130
if (configuration.getProxyHost() != null) {
114131
// Define proxy host
@@ -120,21 +137,53 @@ public void init(final Configuration configuration) {
120137

121138
// If we have proxy auth enabled
122139
if (configuration.getProxyUsername() != null) {
123-
// Create credential provider
124-
final CredentialsProvider credsProvider = new BasicCredentialsProvider();
140+
// Add proxy credentials
125141
credsProvider.setCredentials(
126142
new AuthScope(configuration.getProxyHost(), configuration.getProxyPort()),
127143
new UsernamePasswordCredentials(configuration.getProxyUsername(), configuration.getProxyPassword())
128144
);
129145

130-
// Attach Credentials provider to client builder.
131-
clientBuilder.setDefaultCredentialsProvider(credsProvider);
146+
// Preemptive load context with authentication.
147+
authCache.put(
148+
new HttpHost(configuration.getProxyHost(), configuration.getProxyPort(), configuration.getProxyScheme()), new BasicScheme()
149+
);
132150
}
133151

134152
// Attach Proxy to request config builder
135153
requestConfigBuilder.setProxy(proxyHost);
136154
}
137155

156+
// If BasicAuth credentials are configured.
157+
if (configuration.getBasicAuthUsername() != null) {
158+
try {
159+
// parse ApiHost for Hostname and port.
160+
final URL apiUrl = new URL(configuration.getApiHost());
161+
162+
// Add Kafka-Connect credentials
163+
credsProvider.setCredentials(
164+
new AuthScope(apiUrl.getHost(), apiUrl.getPort()),
165+
new UsernamePasswordCredentials(
166+
configuration.getBasicAuthUsername(),
167+
configuration.getBasicAuthPassword()
168+
)
169+
);
170+
171+
// Preemptive load context with authentication.
172+
authCache.put(
173+
new HttpHost(apiUrl.getHost(), apiUrl.getPort(), apiUrl.getProtocol()), new BasicScheme()
174+
);
175+
} catch (final MalformedURLException exception) {
176+
throw new RuntimeException(exception.getMessage(), exception);
177+
}
178+
}
179+
180+
// Configure context.
181+
httpClientContext.setAuthCache(authCache);
182+
httpClientContext.setCredentialsProvider(credsProvider);
183+
184+
// Attach Credentials provider to client builder.
185+
clientBuilder.setDefaultCredentialsProvider(credsProvider);
186+
138187
// Attach default request config
139188
clientBuilder.setDefaultRequestConfig(requestConfigBuilder.build());
140189

@@ -211,7 +260,7 @@ private <T> T submitGetRequest(final String url, final Map<String, String> getPa
211260
logger.debug("Executing request {}", get.getRequestLine());
212261

213262
// Execute and return
214-
return httpClient.execute(get, responseHandler);
263+
return httpClient.execute(get, responseHandler, httpClientContext);
215264
} catch (final ClientProtocolException | SocketException | URISyntaxException connectionException) {
216265
// Typically this is a connection issue.
217266
throw new ConnectionException(connectionException.getMessage(), connectionException);
@@ -244,7 +293,7 @@ private <T> T submitPostRequest(final String url, final Object requestBody, fina
244293
logger.debug("Executing request {} with {}", post.getRequestLine(), jsonPayloadStr);
245294

246295
// Execute and return
247-
return httpClient.execute(post, responseHandler);
296+
return httpClient.execute(post, responseHandler, httpClientContext);
248297
} catch (final ClientProtocolException | SocketException connectionException) {
249298
// Typically this is a connection issue.
250299
throw new ConnectionException(connectionException.getMessage(), connectionException);
@@ -276,7 +325,7 @@ private <T> T submitPutRequest(final String url, final Object requestBody, final
276325
logger.debug("Executing request {} with {}", put.getRequestLine(), jsonPayloadStr);
277326

278327
// Execute and return
279-
return httpClient.execute(put, responseHandler);
328+
return httpClient.execute(put, responseHandler, httpClientContext);
280329
} catch (final ClientProtocolException | SocketException connectionException) {
281330
// Typically this is a connection issue.
282331
throw new ConnectionException(connectionException.getMessage(), connectionException);
@@ -307,7 +356,7 @@ private <T> T submitDeleteRequest(final String url, final Object requestBody, fi
307356
logger.debug("Executing request {} with {}", delete.getRequestLine(), jsonPayloadStr);
308357

309358
// Execute and return
310-
return httpClient.execute(delete, responseHandler);
359+
return httpClient.execute(delete, responseHandler, httpClientContext);
311360
} catch (final ClientProtocolException | SocketException connectionException) {
312361
// Typically this is a connection issue.
313362
throw new ConnectionException(connectionException.getMessage(), connectionException);

src/main/java/org/sourcelab/kafka/connect/apiclient/rest/exceptions/InvalidRequestException.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@ public static InvalidRequestException factory(final RequestErrorResponse errorRe
6464
Objects.requireNonNull(errorResponse, "Invalid RequestErrorResponse parameter, must not be null");
6565

6666
switch (errorResponse.getErrorCode()) {
67+
case 401:
68+
return new UnauthorizedRequestException(errorResponse.getMessage(), errorResponse.getErrorCode());
6769
case 404:
6870
return new ResourceNotFoundException(errorResponse.getMessage());
6971
case 409:
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/**
2+
* Copyright 2018, 2019 SourceLab.org https://github.com/SourceLabOrg/kafka-connect-client
3+
*
4+
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
5+
* documentation files (the "Software"), to deal in the Software without restriction, including without limitation the
6+
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit
7+
* persons to whom the Software is furnished to do so, subject to the following conditions:
8+
*
9+
* The above copyright notice and this permission notice shall be included in all copies or substantial portions of the
10+
* Software.
11+
*
12+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
13+
* WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
14+
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
15+
* OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
16+
*/
17+
18+
package org.sourcelab.kafka.connect.apiclient.rest.exceptions;
19+
20+
/**
21+
* Thrown if the server required Authentication, but the client was either not configured to provide credentials,
22+
* or those credentials were rejected/invalid.
23+
*/
24+
public class UnauthorizedRequestException extends InvalidRequestException {
25+
26+
/**
27+
* Constructor.
28+
* @param message Error message.
29+
* @param errorCode Error code.
30+
*/
31+
public UnauthorizedRequestException(final String message, final int errorCode) {
32+
super(message, errorCode);
33+
}
34+
}

src/test/java/org/sourcelab/kafka/connect/apiclient/KafkaConnectClientTest.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,11 @@ public void setup() {
6161
);
6262
}
6363

64+
final String basicAuthUsername = System.getenv("KAFKA_CONNECT_BASICAUTH_USERNAME");
65+
if (basicAuthUsername != null && !basicAuthUsername.isEmpty()) {
66+
configuration.useBasicAuth(basicAuthUsername, System.getenv("KAFKA_CONNECT_BASICAUTH_PASSWORD"));
67+
}
68+
6469
// Build api client
6570
this.kafkaConnectClient = new KafkaConnectClient(configuration);
6671
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/**
2+
* Copyright 2018, 2019 SourceLab.org https://github.com/SourceLabOrg/kafka-connect-client
3+
*
4+
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
5+
* documentation files (the "Software"), to deal in the Software without restriction, including without limitation the
6+
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit
7+
* persons to whom the Software is furnished to do so, subject to the following conditions:
8+
*
9+
* The above copyright notice and this permission notice shall be included in all copies or substantial portions of the
10+
* Software.
11+
*
12+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
13+
* WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
14+
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
15+
* OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
16+
*/
17+
18+
package org.sourcelab.kafka.connect.apiclient;
19+
20+
import org.apache.http.HttpStatus;
21+
import org.junit.Test;
22+
import org.sourcelab.kafka.connect.apiclient.rest.RestClient;
23+
import org.sourcelab.kafka.connect.apiclient.rest.RestResponse;
24+
import org.sourcelab.kafka.connect.apiclient.rest.exceptions.UnauthorizedRequestException;
25+
26+
import static org.mockito.ArgumentMatchers.any;
27+
import static org.mockito.Mockito.mock;
28+
import static org.mockito.Mockito.when;
29+
30+
/**
31+
* Unit tests over KafkaConnectClient.
32+
*/
33+
public class KafkaConnectClientUnitTest {
34+
35+
/**
36+
* This test verifies that if the underlying RestClient returns a response with Http Status Code 401,
37+
* then KafkaConnectClient will throw an UnauthorizedRequestException.
38+
*/
39+
@Test(expected = UnauthorizedRequestException.class)
40+
public void unAuthorizedException() {
41+
// Create configuration
42+
final Configuration configuration = new Configuration("http://localhost:9092");
43+
44+
// Create mock RestResponse
45+
final RestResponse restResponse = new RestResponse("Invalid credentials.", HttpStatus.SC_UNAUTHORIZED);
46+
47+
// Create mock RestClient
48+
final RestClient mockRestClient = mock(RestClient.class);
49+
when(mockRestClient.submitRequest(any()))
50+
.thenReturn(restResponse);
51+
52+
// Create client
53+
final KafkaConnectClient client = new KafkaConnectClient(configuration, mockRestClient);
54+
55+
// Call any method that makes a request via RestClient.
56+
client.getConnectors();
57+
}
58+
}

0 commit comments

Comments
 (0)