Skip to content

Commit e0bf40b

Browse files
committed
handle credential unavailable
1 parent 1402caa commit e0bf40b

File tree

2 files changed

+23
-18
lines changed

2 files changed

+23
-18
lines changed

README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ sasl.jaas.config= \
3333
clientId='ignored' \
3434
clientSecret='ignored' \
3535
useWorkloadIdentity='true' \
36+
scope='${CONFLUENT_CLOUD_APP_ID}/.default' \
3637
extension_logicalCluster='lkc-xxxxxx' \
3738
extension_identityPoolId='pool-xxxx';
3839
```
@@ -43,7 +44,7 @@ Use with Schema Registry
4344

4445
example:
4546
```
46-
echo '{"make": "Ford", "model": "Mustang", "price": 10000}' |kafka-avro-console-producer \
47+
echo '{"make": "Ford", "model": "Mustang", "price": 10000}' | kafka-avro-console-producer \
4748
--bootstrap-server <bootstrap>.confluent.cloud:9092 \
4849
--property schema.registry.url=https://<registry>.confluent.cloud \
4950
--property bearer.auth.credentials.source='CUSTOM' \

src/main/java/io/confluent/oauth/HttpAccessTokenRetriever.java

Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -173,28 +173,29 @@ public HttpAccessTokenRetriever(String clientId,
173173

174174
@Override
175175
public String retrieve() throws IOException {
176-
// TODO: should we use the AZURE_FEDERATED_TOKEN_FILE variable to enable workload identity? And AZURE_AUTHORITY_HOST to use for the endpoint url?
177-
if (this.useWorkloadIdentity){
178-
log.debug("using workload identity to get token");
179-
// AccessToken https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/core/azure-core/src/main/java/com/azure/core/credential/AccessToken.java
180-
TokenCredential workloadIdentityCredential = WorkloadIdentityUtils.createWorkloadIdentityCredentialFromEnvironment();
181-
TokenRequestContext tokenRequestContext = WorkloadIdentityUtils.createTokenRequestContextFromEnvironment(scope);
182-
AccessToken azureIdentityAccessToken = workloadIdentityCredential.getTokenSync(tokenRequestContext);
183-
log.trace("useWorkloadIdentity token, got token from AzureAD: '{}'", azureIdentityAccessToken.getToken());
184-
return azureIdentityAccessToken.getToken();
185-
}
176+
String responseBody;
186177

187-
String authorizationHeader = formatAuthorizationHeader(clientId, clientSecret);
188-
String requestBody = requestMethod == "GET" ? null : formatRequestBody(scope);
189-
Retry<String> retry = new Retry<>(loginRetryBackoffMs, loginRetryBackoffMaxMs);
178+
try {
179+
// TODO: should we use the AZURE_FEDERATED_TOKEN_FILE variable to enable workload identity? And AZURE_AUTHORITY_HOST to use for the endpoint url?
180+
if (this.useWorkloadIdentity){
181+
log.debug("using workload identity to get token");
182+
// AccessToken https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/core/azure-core/src/main/java/com/azure/core/credential/AccessToken.java
183+
TokenCredential workloadIdentityCredential = WorkloadIdentityUtils.createWorkloadIdentityCredentialFromEnvironment();
184+
TokenRequestContext tokenRequestContext = WorkloadIdentityUtils.createTokenRequestContextFromEnvironment(scope);
185+
AccessToken azureIdentityAccessToken = workloadIdentityCredential.getTokenSync(tokenRequestContext);
186+
log.trace("useWorkloadIdentity token, got token from AzureAD: '{}'", azureIdentityAccessToken.getToken());
187+
return azureIdentityAccessToken.getToken();
188+
}
190189

191-
final Map<String, String> requestHeaders = new HashMap<>(headers);
192-
requestHeaders.put(AUTHORIZATION_HEADER, authorizationHeader);
193190

191+
String authorizationHeader = formatAuthorizationHeader(clientId, clientSecret);
192+
String requestBody = requestMethod == "GET" ? null : formatRequestBody(scope);
193+
Retry<String> retry = new Retry<>(loginRetryBackoffMs, loginRetryBackoffMaxMs);
194+
195+
final Map<String, String> requestHeaders = new HashMap<>(headers);
196+
requestHeaders.put(AUTHORIZATION_HEADER, authorizationHeader);
194197

195-
String responseBody;
196198

197-
try {
198199
responseBody = retry.execute(() -> {
199200
HttpURLConnection con = null;
200201

@@ -218,6 +219,9 @@ public String retrieve() throws IOException {
218219
throw (IOException) e.getCause();
219220
else
220221
throw new KafkaException(e.getCause());
222+
} catch (CredentialUnavailableException ex){
223+
log.error("Error getting token from AzureAD: {}", ex.getMessage());
224+
throw new KafkaException(ex);
221225
}
222226

223227
return parseAccessToken(responseBody);

0 commit comments

Comments
 (0)