Skip to content

Commit 20e1899

Browse files
committed
added support for azure workload identity PoC
1 parent 7bc4b7e commit 20e1899

File tree

5 files changed

+165
-7
lines changed

5 files changed

+165
-7
lines changed

build.gradle

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,11 @@ repositories {
1010
}
1111

1212
dependencies {
13-
implementation group: 'org.apache.kafka', name: 'kafka-clients', version: '3.6.1'
13+
implementation group: 'org.apache.kafka', name: 'kafka-clients', version: '3.7.0'
1414
implementation 'com.fasterxml.jackson.core:jackson-databind:2.14.2'
15-
implementation 'org.bitbucket.b_c:jose4j:0.9.3'
15+
implementation 'org.bitbucket.b_c:jose4j:0.9.6'
1616
implementation 'org.slf4j:slf4j-api:1.7.36'
17+
implementation 'com.azure:azure-identity:1.12.0'
1718
testImplementation 'org.junit.jupiter:junit-jupiter-api:5.8.1'
1819
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.8.1'
1920
}

src/main/java/io/confluent/oauth/HttpAccessTokenRetriever.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@
2121

2222
package io.confluent.oauth;
2323

24+
import com.azure.core.credential.AccessToken;
25+
import com.azure.core.credential.TokenCredential;
26+
import com.azure.core.credential.TokenRequestContext;
2427
import com.fasterxml.jackson.databind.JsonNode;
2528
import com.fasterxml.jackson.databind.ObjectMapper;
2629
import java.io.ByteArrayInputStream;
@@ -42,6 +45,11 @@
4245
import java.util.concurrent.ExecutionException;
4346
import javax.net.ssl.HttpsURLConnection;
4447
import javax.net.ssl.SSLSocketFactory;
48+
import com.azure.core.credential.AccessToken;
49+
import com.azure.core.credential.TokenCredential;
50+
import com.azure.core.credential.TokenRequestContext;
51+
import io.confluent.oauth.azure.managedidentity.utils.WorkloadIdentityUtils;
52+
import io.confluent.oauth.azure.managedidentity.utils.WorkloadIdentityUtils;
4553
import org.apache.kafka.common.KafkaException;
4654

4755

@@ -122,6 +130,8 @@ public class HttpAccessTokenRetriever implements AccessTokenRetriever {
122130

123131
private final Map<String, String> headers = new HashMap<>();
124132

133+
private final boolean useWorkloadIdentity;
134+
125135
public HttpAccessTokenRetriever(String clientId,
126136
String clientSecret,
127137
String scope,
@@ -131,9 +141,11 @@ public HttpAccessTokenRetriever(String clientId,
131141
long loginRetryBackoffMaxMs,
132142
Integer loginConnectTimeoutMs,
133143
Integer loginReadTimeoutMs,
134-
String requestMethod) {
144+
String requestMethod,
145+
boolean useWorkloadIdentity) {
135146
this.clientId = Objects.requireNonNull(clientId);
136147
this.clientSecret = Objects.requireNonNull(clientSecret);
148+
this.useWorkloadIdentity = useWorkloadIdentity;
137149
this.scope = scope;
138150
this.sslSocketFactory = sslSocketFactory;
139151
this.tokenEndpointUrl = Objects.requireNonNull(tokenEndpointUrl);
@@ -161,6 +173,17 @@ public HttpAccessTokenRetriever(String clientId,
161173

162174
@Override
163175
public String retrieve() throws IOException {
176+
// TODO: should we use the AZURE_FEDERATED_TOKEN_FILE variable to enable workload identity? And AZURE_AUTHORITY_HOST to use for the endpoint url?
177+
if (this.useWorkloadIdentity){
178+
log.debug("using workload identity to get token");
179+
// AccessToken https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/core/azure-core/src/main/java/com/azure/core/credential/AccessToken.java
180+
TokenCredential workloadIdentityCredential = WorkloadIdentityUtils.createWorkloadIdentityCredentialFromEnvironment();
181+
TokenRequestContext tokenRequestContext = WorkloadIdentityUtils.createTokenRequestContextFromEnvironment(scope);
182+
AccessToken azureIdentityAccessToken = workloadIdentityCredential.getTokenSync(tokenRequestContext);
183+
log.trace("useWorkloadIdentity token, got token from AzureAD: '{}'", azureIdentityAccessToken.getToken());
184+
return azureIdentityAccessToken.getToken();
185+
}
186+
164187
String authorizationHeader = formatAuthorizationHeader(clientId, clientSecret);
165188
String requestBody = requestMethod == "GET" ? null : formatRequestBody(scope);
166189
Retry<String> retry = new Retry<>(loginRetryBackoffMs, loginRetryBackoffMaxMs);

src/main/java/io/confluent/oauth/azure/managedidentity/OAuthBearerLoginCallbackHandler.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,7 @@ public class OAuthBearerLoginCallbackHandler implements AuthenticateCallbackHand
167167
public static final String CLIENT_ID_CONFIG = "clientId";
168168
public static final String CLIENT_SECRET_CONFIG = "clientSecret";
169169
public static final String SCOPE_CONFIG = "scope";
170+
public static final String USE_WORKLOAD_IDENTITY_CONFIG = "useWorkloadIdentity";
170171

171172
public static final String CLIENT_ID_DOC = "The OAuth/OIDC identity provider-issued " +
172173
"client ID to uniquely identify the service account to use for authentication for " +
@@ -182,9 +183,15 @@ public class OAuthBearerLoginCallbackHandler implements AuthenticateCallbackHand
182183
"clientcredentials grant type.";
183184

184185
public static final String SCOPE_DOC = "The (optional) HTTP/HTTPS login request to the " +
185-
"token endpoint (" + SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL + ") may need to specify an " +
186-
"OAuth \"scope\". If so, the " + SCOPE_CONFIG + " is used to provide the value to " +
187-
"include with the login request.";
186+
"token endpoint (" + SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL + ") may need to specify an " +
187+
"OAuth \"scope\". If so, the " + SCOPE_CONFIG + " is used to provide the value to " +
188+
"include with the login request.";
189+
190+
191+
public static final String USE_WORKLOAD_IDENTITY_DOC = "The (optional) HTTP/HTTPS login request to the " +
192+
"token endpoint (" + SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL + ") may need to specify an " +
193+
"'Authorization' header of the Workload Identity. If so, the " + USE_WORKLOAD_IDENTITY_CONFIG +
194+
" must be set to true";
188195

189196
private static final String EXTENSION_PREFIX = "extension_";
190197

@@ -231,6 +238,7 @@ protected AccessTokenRetriever createAccessTokenRetriever(Map<String, ?> configs
231238
final String clientId = jou.validateString(CLIENT_ID_CONFIG);
232239
final String clientSecret = jou.validateString(CLIENT_SECRET_CONFIG);
233240
final String scope = jou.validateString(SCOPE_CONFIG, false);
241+
final boolean useWorkloadIdentity = jou.validateString(USE_WORKLOAD_IDENTITY_CONFIG).equalsIgnoreCase("true");
234242

235243
SSLSocketFactory sslSocketFactory = null;
236244

@@ -246,7 +254,8 @@ protected AccessTokenRetriever createAccessTokenRetriever(Map<String, ?> configs
246254
cu.validateLong(SASL_LOGIN_RETRY_BACKOFF_MAX_MS),
247255
cu.validateInteger(SASL_LOGIN_CONNECT_TIMEOUT_MS, false),
248256
cu.validateInteger(SASL_LOGIN_READ_TIMEOUT_MS, false),
249-
"GET");
257+
"GET",
258+
useWorkloadIdentity);
250259

251260
httpAccessTokenRetriever.getHeaders().put("Metadata", "true");
252261
return httpAccessTokenRetriever;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
MIT License
3+
4+
Copyright (c) 2023 Nikolai Seip
5+
6+
Permission is hereby granted, free of charge, to any person obtaining a copy
7+
of this software and associated documentation files (the "Software"), to deal
8+
in the Software without restriction, including without limitation the rights
9+
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10+
copies of the Software, and to permit persons to whom the Software is
11+
furnished to do so, subject to the following conditions:
12+
13+
The above copyright notice and this permission notice shall be included in all
14+
copies or substantial portions of the Software.
15+
16+
This file was copied from https://github.com/nniikkoollaaii/workload-identity-kafka-sasl-oauthbearer/blob/main/src/main/java/io/github/nniikkoollaaii/kafka/workload_identity/WorkloadIdentityKafkaClientOAuthBearerAuthenticationException.java
17+
*/
18+
package io.confluent.oauth.azure.managedidentity.utils;
19+
20+
/**
21+
* Custom runtime exception to signal errors when using Workload Identity to fetch a token from AzureAD.
22+
*/
23+
public class WorkloadIdentityKafkaClientOAuthBearerAuthenticationException extends RuntimeException {
24+
25+
public WorkloadIdentityKafkaClientOAuthBearerAuthenticationException(String message) {
26+
super(message);
27+
}
28+
}
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
/*
2+
MIT License
3+
4+
Copyright (c) 2023 Nikolai Seip
5+
6+
Permission is hereby granted, free of charge, to any person obtaining a copy
7+
of this software and associated documentation files (the "Software"), to deal
8+
in the Software without restriction, including without limitation the rights
9+
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10+
copies of the Software, and to permit persons to whom the Software is
11+
furnished to do so, subject to the following conditions:
12+
13+
The above copyright notice and this permission notice shall be included in all
14+
copies or substantial portions of the Software.
15+
16+
This file was copied from https://github.com/nniikkoollaaii/workload-identity-kafka-sasl-oauthbearer/blob/main/src/main/java/io/github/nniikkoollaaii/kafka/workload_identity/WorkloadIdentityUtils.java
17+
*/
18+
package io.confluent.oauth.azure.managedidentity.utils;
19+
20+
import com.azure.core.credential.TokenRequestContext;
21+
import com.azure.identity.WorkloadIdentityCredential;
22+
import com.azure.identity.WorkloadIdentityCredentialBuilder;
23+
import org.slf4j.Logger;
24+
import org.slf4j.LoggerFactory;
25+
26+
public class WorkloadIdentityUtils {
27+
28+
private static final Logger log = LoggerFactory.getLogger(WorkloadIdentityUtils.class);
29+
30+
31+
// ENV vars set by AzureAD Workload Identity Mutating Admission Webhook: https://azure.github.io/azure-workload-identity/docs/installation/mutating-admission-webhook.html
32+
public static final String AZURE_AD_WORKLOAD_IDENTITY_MUTATING_ADMISSION_WEBHOOK_ENV_FEDERATED_TOKEN_FILE = "AZURE_FEDERATED_TOKEN_FILE";
33+
public static final String AZURE_AD_WORKLOAD_IDENTITY_MUTATING_ADMISSION_WEBHOOK_ENV_AUTHORITY_HOST = "AZURE_AUTHORITY_HOST";
34+
public static final String AZURE_AD_WORKLOAD_IDENTITY_MUTATING_ADMISSION_WEBHOOK_ENV_TENANT_ID = "AZURE_TENANT_ID";
35+
public static final String AZURE_AD_WORKLOAD_IDENTITY_MUTATING_ADMISSION_WEBHOOK_ENV_CLIENT_ID = "AZURE_CLIENT_ID";
36+
37+
38+
public static String getTenantId() {
39+
String tenantId = System.getenv(WorkloadIdentityUtils.AZURE_AD_WORKLOAD_IDENTITY_MUTATING_ADMISSION_WEBHOOK_ENV_TENANT_ID);
40+
if (tenantId == null || tenantId.equals(""))
41+
throw new WorkloadIdentityKafkaClientOAuthBearerAuthenticationException(String.format("Missing environment variable %s", WorkloadIdentityUtils.AZURE_AD_WORKLOAD_IDENTITY_MUTATING_ADMISSION_WEBHOOK_ENV_TENANT_ID));
42+
log.debug("Config: Tenant Id " + tenantId);
43+
return tenantId;
44+
}
45+
46+
public static String getClientId() {
47+
String clientId = System.getenv(WorkloadIdentityUtils.AZURE_AD_WORKLOAD_IDENTITY_MUTATING_ADMISSION_WEBHOOK_ENV_CLIENT_ID);
48+
if (clientId == null || clientId.equals(""))
49+
throw new WorkloadIdentityKafkaClientOAuthBearerAuthenticationException(String.format("Missing environment variable %s", WorkloadIdentityUtils.AZURE_AD_WORKLOAD_IDENTITY_MUTATING_ADMISSION_WEBHOOK_ENV_CLIENT_ID));
50+
log.debug("Config: Client Id " + clientId);
51+
return clientId;
52+
}
53+
54+
public static WorkloadIdentityCredential createWorkloadIdentityCredentialFromEnvironment() {
55+
String federatedTokeFilePath = System.getenv(WorkloadIdentityUtils.AZURE_AD_WORKLOAD_IDENTITY_MUTATING_ADMISSION_WEBHOOK_ENV_FEDERATED_TOKEN_FILE);
56+
if (federatedTokeFilePath == null || federatedTokeFilePath.equals(""))
57+
throw new WorkloadIdentityKafkaClientOAuthBearerAuthenticationException(String.format("Missing environment variable %s", WorkloadIdentityUtils.AZURE_AD_WORKLOAD_IDENTITY_MUTATING_ADMISSION_WEBHOOK_ENV_FEDERATED_TOKEN_FILE));
58+
log.debug("Config: Federated Token File Path " + federatedTokeFilePath);
59+
60+
String authorityHost = System.getenv(WorkloadIdentityUtils.AZURE_AD_WORKLOAD_IDENTITY_MUTATING_ADMISSION_WEBHOOK_ENV_AUTHORITY_HOST);
61+
if (authorityHost == null || authorityHost.equals(""))
62+
throw new WorkloadIdentityKafkaClientOAuthBearerAuthenticationException(String.format("Missing environment variable %s", WorkloadIdentityUtils.AZURE_AD_WORKLOAD_IDENTITY_MUTATING_ADMISSION_WEBHOOK_ENV_AUTHORITY_HOST));
63+
log.debug("Config: Authority host " + authorityHost);
64+
65+
String tenantId = getTenantId();
66+
String clientId = getClientId();
67+
68+
69+
WorkloadIdentityCredential workloadIdentityCredential = new WorkloadIdentityCredentialBuilder()
70+
.tokenFilePath(federatedTokeFilePath)
71+
.authorityHost(authorityHost)
72+
.clientId(clientId)
73+
.tenantId(tenantId)
74+
.build();
75+
76+
return workloadIdentityCredential;
77+
}
78+
79+
80+
public static TokenRequestContext createTokenRequestContextFromEnvironment(String scope) {
81+
82+
String tenantId = getTenantId();
83+
String clientId = getClientId();
84+
85+
//Construct a TokenRequestContext to be used be requsting a token at runtime.
86+
String usedScope = clientId + "/.default";
87+
if(scope != null && !scope.isEmpty()) {
88+
usedScope = scope;
89+
}
90+
log.debug("Config: Scope " + usedScope);
91+
TokenRequestContext tokenRequestContext = new TokenRequestContext() // TokenRequestContext: https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/core/azure-core/src/main/java/com/azure/core/credential/TokenRequestContext.java
92+
.addScopes(usedScope)
93+
.setTenantId(tenantId);
94+
95+
return tokenRequestContext;
96+
}
97+
}

0 commit comments

Comments
 (0)