Skip to content

Commit 1402caa

Browse files
committed
add support for schema registry
1 parent 20e1899 commit 1402caa

File tree

4 files changed

+180
-5
lines changed

4 files changed

+180
-5
lines changed

README.md

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@
22

33
Apache Kafka client library providing additional integrations relating to OAuth/OIDC integrations with Confluent Cloud and Apache Kafka.
44

5-
## Authenticating to Confluent Cloud via OAuth, using Azure Managed Identites
5+
## Authenticating to Confluent Cloud via OAuth, using Azure Managed Identities
66

7-
Example Kafka client config and JAAS config:
7+
Example Kafka client config and JAAS config for authenticating to Confluent Cloud using Azure Managed Identities / Pod Identity:
88

99
```
1010
bootstrap.servers=pkc-xxxxx.ap-southeast-2.aws.confluent.cloud:9092
@@ -19,3 +19,38 @@ sasl.jaas.config= \
1919
extension_logicalCluster='lkc-xxxxxx' \
2020
extension_identityPoolId='pool-xxxx';
2121
```
22+
23+
Use Azure K8s Workload Identities:
24+
25+
```
26+
bootstrap.servers=pkc-xxxxx.ap-southeast-2.aws.confluent.cloud:9092
27+
security.protocol=SASL_SSL
28+
sasl.oauthbearer.token.endpoint.url=${AZURE_AUTHORITY_HOST}${AZURE_TENANT_ID}/oauth2/v2.0/token
29+
sasl.login.callback.handler.class=io.confluent.oauth.azure.managedidentity.OAuthBearerLoginCallbackHandler
30+
sasl.mechanism=OAUTHBEARER
31+
sasl.jaas.config= \
32+
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
33+
clientId='ignored' \
34+
clientSecret='ignored' \
35+
useWorkloadIdentity='true' \
36+
extension_logicalCluster='lkc-xxxxxx' \
37+
extension_identityPoolId='pool-xxxx';
38+
```
39+
40+
41+
42+
Use with Schema Registry
43+
44+
example:
45+
```
46+
echo '{"make": "Ford", "model": "Mustang", "price": 10000}' |kafka-avro-console-producer \
47+
--bootstrap-server <bootstrap>.confluent.cloud:9092 \
48+
--property schema.registry.url=https://<registry>.confluent.cloud \
49+
--property bearer.auth.credentials.source='CUSTOM' \
50+
--property bearer.auth.custom.provider.class=io.confluent.oauth.azure.managedidentity.RegistryBearerAuthCredentialProvider \
51+
--property bearer.auth.logical.cluster='lsrc-xxxxxx' \
52+
--producer.config client.properties \
53+
--reader-config client.properties \
54+
--topic cars \
55+
--property value.schema='{"type": "record", "name": "Car", "namespace": "io.spoud.training", "fields": [{"name": "make", "type": "string"}, {"name": "model", "type": "string"}, {"name": "price", "type": "int", "default": 0}]}'
56+
```

build.gradle

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,14 @@ version '1.1-SNAPSHOT'
77

88
repositories {
99
mavenCentral()
10+
maven {
11+
url = uri("https://packages.confluent.io/maven")
12+
}
1013
}
1114

1215
dependencies {
1316
implementation group: 'org.apache.kafka', name: 'kafka-clients', version: '3.7.0'
17+
implementation group: 'io.confluent', name: 'kafka-schema-registry-client', version: '7.6.0'
1418
implementation 'com.fasterxml.jackson.core:jackson-databind:2.14.2'
1519
implementation 'org.bitbucket.b_c:jose4j:0.9.6'
1620
implementation 'org.slf4j:slf4j-api:1.7.36'

src/main/java/io/confluent/oauth/azure/managedidentity/OAuthBearerLoginCallbackHandler.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.util.HashMap;
2929
import java.util.List;
3030
import java.util.Map;
31+
import java.util.Optional;
3132
import javax.net.ssl.SSLSocketFactory;
3233
import javax.security.auth.callback.Callback;
3334
import javax.security.auth.callback.UnsupportedCallbackException;
@@ -228,7 +229,7 @@ void init(AccessTokenRetriever accessTokenRetriever, AccessTokenValidator access
228229
isInitialized = true;
229230
}
230231

231-
protected AccessTokenRetriever createAccessTokenRetriever(Map<String, ?> configs,
232+
public static AccessTokenRetriever createAccessTokenRetriever(Map<String, ?> configs,
232233
String saslMechanism,
233234
Map<String, Object> jaasConfig) {
234235
final ConfigurationUtils cu = new ConfigurationUtils(configs, saslMechanism);
@@ -245,13 +246,18 @@ protected AccessTokenRetriever createAccessTokenRetriever(Map<String, ?> configs
245246
if (jou.shouldCreateSSLSocketFactory(tokenEndpointUrl))
246247
sslSocketFactory = jou.createSSLSocketFactory();
247248

249+
// make sure we have defaults since we don't get the properties for the schema registry case
250+
long loginRetryBackoffMs = Optional.ofNullable(cu.validateLong(SASL_LOGIN_RETRY_BACKOFF_MS, false)).orElse(DEFAULT_SASL_LOGIN_RETRY_BACKOFF_MAX_MS);
251+
long loginRetryBackoffMaxMs = Optional.ofNullable(cu.validateLong(SASL_LOGIN_RETRY_BACKOFF_MAX_MS, false)).orElse(DEFAULT_SASL_LOGIN_RETRY_BACKOFF_MAX_MS);
252+
253+
248254
io.confluent.oauth.HttpAccessTokenRetriever httpAccessTokenRetriever = new io.confluent.oauth.HttpAccessTokenRetriever(clientId,
249255
clientSecret,
250256
scope,
251257
sslSocketFactory,
252258
tokenEndpointUrl.toString(),
253-
cu.validateLong(SASL_LOGIN_RETRY_BACKOFF_MS),
254-
cu.validateLong(SASL_LOGIN_RETRY_BACKOFF_MAX_MS),
259+
loginRetryBackoffMs,
260+
loginRetryBackoffMaxMs,
255261
cu.validateInteger(SASL_LOGIN_CONNECT_TIMEOUT_MS, false),
256262
cu.validateInteger(SASL_LOGIN_READ_TIMEOUT_MS, false),
257263
"GET",
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
package io.confluent.oauth.azure.managedidentity;
2+
3+
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientConfig;
4+
import io.confluent.kafka.schemaregistry.client.security.bearerauth.BearerAuthCredentialProvider;
5+
import org.apache.kafka.common.KafkaException;
6+
import org.apache.kafka.common.config.ConfigException;
7+
import org.apache.kafka.common.config.SaslConfigs;
8+
import org.apache.kafka.common.config.types.Password;
9+
import org.apache.kafka.common.security.JaasContext;
10+
import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
11+
import org.apache.kafka.common.security.oauthbearer.internals.secured.*;
12+
import org.slf4j.Logger;
13+
import org.slf4j.LoggerFactory;
14+
15+
import javax.security.auth.login.AppConfigurationEntry;
16+
import java.io.IOException;
17+
import java.net.URL;
18+
import java.util.*;
19+
20+
import static io.confluent.oauth.azure.managedidentity.OAuthBearerLoginCallbackHandler.createAccessTokenRetriever;
21+
22+
public class RegistryBearerAuthCredentialProvider implements BearerAuthCredentialProvider {
23+
24+
private static final Logger log = LoggerFactory.getLogger(RegistryBearerAuthCredentialProvider.class);
25+
public static final String SASL_IDENTITY_POOL_CONFIG = "extension_identityPoolId";
26+
27+
private String targetSchemaRegistry;
28+
private String targetIdentityPoolId;
29+
private Map<String, Object> moduleOptions;
30+
31+
private AccessTokenRetriever accessTokenRetriever;
32+
private AccessTokenValidator accessTokenValidator;
33+
private boolean isInitialized;
34+
35+
36+
@Override
37+
public void configure(Map<String, ?> configs) {
38+
// from SaslOauthCredentialProvider
39+
Map<String, Object> updatedConfigs = getConfigsForJaasUtil(configs);
40+
JaasContext jaasContext = JaasContext.loadClientContext(updatedConfigs);
41+
List<AppConfigurationEntry> appConfigurationEntries = jaasContext.configurationEntries();
42+
Map<String, ?> jaasconfig;
43+
if (Objects.requireNonNull(appConfigurationEntries).size() == 1
44+
&& appConfigurationEntries.get(0) != null) {
45+
jaasconfig = Collections.unmodifiableMap(
46+
((AppConfigurationEntry) appConfigurationEntries.get(0)).getOptions());
47+
} else {
48+
throw new ConfigException(
49+
String.format("Must supply exactly 1 non-null JAAS mechanism configuration (size was %d)",
50+
appConfigurationEntries.size()));
51+
}
52+
53+
// make sure we have scope and sub set
54+
Map<String, Object> myConfigs = new HashMap<>(configs);
55+
myConfigs.put(SaslConfigs.SASL_OAUTHBEARER_SCOPE_CLAIM_NAME, "scope");
56+
myConfigs.put(SaslConfigs.SASL_OAUTHBEARER_SUB_CLAIM_NAME, "sub");
57+
58+
59+
ConfigurationUtils cu = new ConfigurationUtils(myConfigs);
60+
JaasOptionsUtils jou = new JaasOptionsUtils((Map<String, Object>) jaasconfig);
61+
62+
targetSchemaRegistry = cu.validateString(
63+
SchemaRegistryClientConfig.BEARER_AUTH_LOGICAL_CLUSTER, false);
64+
65+
// if the schema registry oauth configs are set it is given higher preference
66+
targetIdentityPoolId = cu.get(SchemaRegistryClientConfig.BEARER_AUTH_IDENTITY_POOL_ID) != null
67+
? cu.validateString(SchemaRegistryClientConfig.BEARER_AUTH_IDENTITY_POOL_ID)
68+
: jou.validateString(SASL_IDENTITY_POOL_CONFIG, false);
69+
70+
String saslMechanism = cu.validateString(SaslConfigs.SASL_MECHANISM);
71+
moduleOptions = JaasOptionsUtils.getOptions(saslMechanism, appConfigurationEntries);
72+
AccessTokenRetriever accessTokenRetriever = createAccessTokenRetriever(myConfigs, saslMechanism, moduleOptions);
73+
74+
AccessTokenValidator accessTokenValidator = AccessTokenValidatorFactory.create(myConfigs, saslMechanism);
75+
init(accessTokenRetriever, accessTokenValidator);
76+
}
77+
78+
/*
79+
* Package-visible for testing.
80+
*/
81+
82+
void init(AccessTokenRetriever accessTokenRetriever, AccessTokenValidator accessTokenValidator) {
83+
this.accessTokenRetriever = accessTokenRetriever;
84+
this.accessTokenValidator = accessTokenValidator;
85+
86+
try {
87+
this.accessTokenRetriever.init();
88+
} catch (IOException e) {
89+
throw new KafkaException("The OAuth login configuration encountered an error when initializing the AccessTokenRetriever", e);
90+
}
91+
92+
isInitialized = true;
93+
}
94+
95+
96+
@Override
97+
public String getBearerToken(URL url) {
98+
try {
99+
String accessToken = accessTokenRetriever.retrieve();
100+
OAuthBearerToken token = accessTokenValidator.validate(accessToken);
101+
return accessToken;
102+
} catch (ValidateException | IOException e) {
103+
log.warn(e.getMessage(), e);
104+
return "";
105+
}
106+
}
107+
108+
@Override
109+
public String getTargetIdentityPoolId() {
110+
return targetIdentityPoolId;
111+
}
112+
113+
@Override
114+
public String getTargetSchemaRegistry() {
115+
return targetSchemaRegistry;
116+
}
117+
118+
// from SaslOauthCredentialProvider
119+
Map<String, Object> getConfigsForJaasUtil(Map<String, ?> configs) {
120+
Map<String, Object> updatedConfigs = new HashMap<>(configs);
121+
if (updatedConfigs.containsKey(SaslConfigs.SASL_JAAS_CONFIG)) {
122+
Object saslJaasConfig = updatedConfigs.get(SaslConfigs.SASL_JAAS_CONFIG);
123+
if (saslJaasConfig instanceof String) {
124+
updatedConfigs.put(SaslConfigs.SASL_JAAS_CONFIG, new Password((String) saslJaasConfig));
125+
}
126+
}
127+
return updatedConfigs;
128+
}
129+
130+
}

0 commit comments

Comments
 (0)