Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,14 @@ jobs:
uses: pnpm/action-setup@v4
with:
version: latest
- name: Install kinit for Kerberos
run: |
sudo apt-get update
sudo DEBIAN_FRONTEND=noninteractive apt-get install -y krb5-user
- name: Install dependencies
run: pnpm install --frozen-lockfile
- name: Start Kafka (${{ matrix.confluent-kafka-version }}) Cluster
run: docker compose up -d --wait
run: docker compose up --build --force-recreate -d --wait
env:
KAFKA_VERSION: ${{ matrix.confluent-kafka-version }}
- name: Run Tests
Expand Down
54 changes: 54 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,26 @@
services:
kdc:
image: plt-kafka-kdc:latest
pull_policy: never
build:
context: ./docker/kerberos
container_name: kdc
ports:
- '8000:88/tcp'
- '8000:88/udp'
- '8001:749'
volumes:
- './docker/kerberos/krb5-kdc.conf:/etc/krb5.conf:ro'
- './docker/kerberos/kdc.conf:/var/lib/krb5kdc/kdc.conf:ro'
- './docker/kerberos/init.sh:/init.sh:ro'
- './tmp/kerberos:/data'
entrypoint: ['/bin/sh', '/init.sh']
healthcheck:
test: ['CMD', 'kadmin.local', '-q', 'list_principals']
interval: 10s
timeout: 5s
retries: 5

broker-single:
# Rule of thumb: Confluent Kafka Version = Apache Kafka Version + 4.0.0
image: &image confluentinc/cp-kafka:${KAFKA_VERSION:-8.0.0}
Expand Down Expand Up @@ -74,6 +96,38 @@ services:
KAFKA_SASL_OAUTHBEARER_EXPECTED_AUDIENCE: users
KAFKA_SASL_OAUTHBEARER_EXPECTED_SCOPE: test

broker-sasl-kerberos:
image: *image
container_name: broker-sasl-kerberos
ports:
- "9003:9092" # SASL
- "19003:19092" # PLAIN TEXT - Used to create users
healthcheck: *health_check
volumes:
- "./docker/sasl/jaas-kerberos.conf:/etc/kafka/jaas.conf:ro"
- "./docker/kerberos/krb5-broker.conf:/etc/krb5.conf:ro"
- "./tmp/kerberos/broker.keytab:/etc/kafka/broker.keytab:ro"
depends_on:
kdc:
condition: service_healthy
environment:
<<: *common_config
# Broker specific general and port options
KAFKA_LISTENERS: "SASL://:9092,DOCKER://:19092,CONTROLLER://:29092"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "SASL:SASL_PLAINTEXT,DOCKER:PLAINTEXT,CONTROLLER:PLAINTEXT"
KAFKA_ADVERTISED_LISTENERS: "SASL://localhost:9003,DOCKER://broker-sasl-kerberos:19092"
KAFKA_CONTROLLER_QUORUM_VOTERS: "1@broker-sasl-kerberos:29092"
# SASL
KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/jaas.conf -Djava.security.krb5.conf=/etc/krb5.conf"
KAFKA_LISTENER_NAME_SASL_GSSAPI_SASL_JAAS_CONFIG: 'com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/etc/kafka/broker.keytab" principal="broker/broker-sasl-kerberos@EXAMPLE.COM";'
KAFKA_CONNECTIONS_MAX_REAUTH_MS: 5000
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "false"
KAFKA_SUPER_USERS: 'User:admin;User:broker/broker-sasl-kerberos@EXAMPLE.COM;User:admin-keytab/localhost@EXAMPLE.COM;User:admin-password/localhost@EXAMPLE.COM'
KAFKA_SASL_ENABLED_MECHANISMS: "GSSAPI"
KAFKA_SASL_MECHANISM_CONTROLLER_PROTOCOL: "PLAIN"
KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: "PLAIN"
KAFKA_SASL_KERBEROS_SERVICE_NAME: 'kafka'

broker-cluster-1:
image: *image
container_name: broker-cluster-1
Expand Down
2 changes: 2 additions & 0 deletions docker/kerberos/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
FROM ubuntu:25.04
RUN apt-get update && apt-get install -y krb5-kdc krb5-admin-server && rm -rf /var/lib/apt/lists/*
10 changes: 10 additions & 0 deletions docker/kerberos/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
To create `kafka.keytab`:

```
ktutil
addent -password -p admin/localhost@example.com -k 1 -e aes256-cts-hmac-sha1-96
write_kt kafka.keytab
quit
```

On Mac, use `ktutil` from `krb5`, installed via Homebrew
29 changes: 29 additions & 0 deletions docker/kerberos/init.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#!/bin/sh
set -e

# Setup KDC if needed
if [ ! -f /var/lib/krb5kdc/principal ]; then

echo "Setting up KDC ..."
kdb5_util create -s -P password

# # ACL file
echo "*/admin@EXAMPLE.COM *" > /etc/krb5kdc/kadm5.acl

# Create principals
kadmin.local -q "addprinc -pw admin admin@EXAMPLE.COM" # Main administrator
kadmin.local -q "addprinc -randkey broker/broker-sasl-kerberos@EXAMPLE.COM" # Kafka broker
kadmin.local -q "addprinc -randkey admin-keytab@EXAMPLE.COM" # Client with keytab
kadmin.local -q "addprinc -pw admin admin-password@EXAMPLE.COM" # Client with password

# Generate keytabs
kadmin.local -q "ktadd -k /data/broker.keytab broker/broker-sasl-kerberos@EXAMPLE.COM"
kadmin.local -q "ktadd -k /data/admin.keytab admin-keytab@EXAMPLE.COM"

# Allow other containers to read the keytab files
chown -R ubuntu:ubuntu /data
chmod -R 755 /data
fi

krb5kdc
kadmind -nofork
11 changes: 11 additions & 0 deletions docker/kerberos/kdc.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
[kdcdefaults]
kdc_ports = 88
kdc_tcp_ports = 88

[realms]
EXAMPLE.COM = {
acl_file = /var/lib/krb5kdc/kadm5.acl
dict_file = /usr/share/dict/words
admin_keytab = /var/lib/krb5kdc/kadm5.keytab
supported_enctypes = aes256-cts:normal aes128-cts:normal
}
14 changes: 14 additions & 0 deletions docker/kerberos/krb5-broker.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
[libdefaults]
default_realm = EXAMPLE.COM
dns_lookup_realm = false
dns_lookup_kdc = false

[realms]
EXAMPLE.COM = {
kdc = kdc:88
admin_server = kdc:749
}

[domain_realm]
.example.com = EXAMPLE.COM
example.com = EXAMPLE.COM
14 changes: 14 additions & 0 deletions docker/kerberos/krb5-kdc.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
[libdefaults]
default_realm = EXAMPLE.COM
dns_lookup_realm = false
dns_lookup_kdc = false

[realms]
EXAMPLE.COM = {
kdc = localhost:88
admin_server = localhost:749
}

[domain_realm]
.example.com = EXAMPLE.COM
example.com = EXAMPLE.COM
9 changes: 9 additions & 0 deletions docker/sasl/jaas-kerberos.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
KafkaServer {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/etc/kafka/broker.keytab"
principal="broker/broker-sasl-kerberos@EXAMPLE.COM"
serviceName="kafka"
useTicketCache=false;
};
52 changes: 52 additions & 0 deletions docs/base.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,4 +132,56 @@ const producer = new Producer({
})
```

## Connecting to Kafka via SASL using a custom authenticator

For advanced use cases where you need full control over the SASL authentication process, you can provide a custom `authenticate` function in the `sasl` options. This allows you to implement custom authentication flows, handle complex credential management, or integrate with external authentication systems.

Example:

```javascript
import { Producer, stringSerializers } from '@platformatic/kafka'

const producer = new Producer({
clientId: 'my-producer',
bootstrapBrokers: ['localhost:9092'],
serializers: stringSerializers,
sasl: {
mechanism: 'PLAIN',
authenticate: async (mechanism, connection, authenticate, usernameProvider, passwordProvider, tokenProvider, callback) => {
try {
// Custom logic to retrieve or generate credentials
const username = typeof usernameProvider === 'function'
? await usernameProvider()
: usernameProvider
const password = typeof passwordProvider === 'function'
? await passwordProvider()
: passwordProvider

// Perform the SASL authentication
const authData = Buffer.from(`\u0000${username}\u0000${password}`)
const response = await authenticate({
authBytes: authData
})

callback(null, response)
} catch (err) {
callback(err)
}
}
}
})
```

The `authenticate` function receives the following parameters:

- `mechanism`: The SASL mechanism being used (e.g., 'PLAIN', 'SCRAM-SHA-256')
- `connection`: The Connection instance being authenticated
- `authenticate`: The SASL authentication API function to send auth bytes to the server
- `usernameProvider`: The username (string or async function) from the sasl options
- `passwordProvider`: The password (string or async function) from the sasl options
- `tokenProvider`: The token (string or async function) from the sasl options
- `callback`: A callback function to call with the authentication result

**Important**: The `authenticate` function should never throw exceptions, especially when using async functions. The function is not awaited and exceptions are not handled, which can lead to memory leaks, resource leaks, and unexpected behavior. Always wrap your code in a try-catch block and pass errors to the callback instead.

[node-socket-write]: https://nodejs.org/dist/latest/docs/api/stream.html#writablewritechunk-encoding-callback
6 changes: 4 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@platformatic/kafka",
"version": "1.21.0",
"version": "1.22.0-alpha.1",
"description": "Modern and performant client for Apache Kafka",
"homepage": "https://github.com/platformatic/kafka",
"author": "Platformatic Inc. <oss@platformatic.dev> (https://platformatic.dev)",
Expand Down Expand Up @@ -58,6 +58,7 @@
"@confluentinc/kafka-javascript": "^1.5.0",
"@platformatic/rdkafka": "^4.0.0",
"@types/debug": "^4.1.12",
"@types/kerberos": "^1.1.5",
"@types/node": "^22.18.5",
"@types/semver": "^7.7.1",
"@watchable/unpromise": "^1.0.2",
Expand All @@ -68,8 +69,9 @@
"eslint": "^9.35.0",
"fast-jwt": "^6.0.2",
"hwp": "^0.4.1",
"kafkajs": "^2.2.4",
"json5": "^2.2.3",
"kafkajs": "^2.2.4",
"kerberos": "^2.2.2",
"neostandard": "^0.12.2",
"parse5": "^7.3.0",
"prettier": "^3.6.2",
Expand Down
Loading