Skip to content

Commit 7372e9e

Browse files
AMW-162 Add support for KRaft
1 parent c8862f7 commit 7372e9e

File tree

19 files changed

+479
-21
lines changed

19 files changed

+479
-21
lines changed

.github/workflows/ci.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ name: CI
33
on:
44
push:
55
branches:
6-
- main
6+
- AMW-162
77
pull_request:
88
schedule:
99
- cron: '0 6 * * *'
@@ -16,4 +16,4 @@ jobs:
1616
fqcn: 'middleware_automation/amq_streams'
1717
root_permission_varname: 'amq_streams_install_requires_become'
1818
molecule_tests: >-
19-
[ "default" ]
19+
[ "default", "amq_streams_kraft" ]
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
---
2+
- name: "Automate AMQ Streams KRaft install"
3+
hosts: all
4+
vars_files:
5+
- vars.yml
6+
roles:
7+
- role: amq_streams_common
8+
- role: amq_streams_kraft
9+
tasks:
10+
- name: "Ensure AMQ Streams Broker is running and available."
11+
ansible.builtin.include_role:
12+
name: amq_streams_broker
13+
vars:
14+
amq_streams_common_skip_download: true
15+
16+
- name: "Ensure AMQ Streams Connect is running and available."
17+
ansible.builtin.include_role:
18+
name: amq_streams_connect
19+
vars:
20+
connectors:
21+
- { name: "file", path: "connectors/file.yml" }
22+
23+
- name: "Validate that KRaft deployment is functional."
24+
ansible.builtin.include_role:
25+
name: amq_streams_kraft
26+
tasks_from: validate.yml
27+
28+
- name: "Validate that Broker deployment is functional."
29+
ansible.builtin.include_role:
30+
name: amq_streams_broker
31+
tasks_from: validate.yml
32+
33+
- name: "Validate that Connect deployment is functional."
34+
ansible.builtin.include_role:
35+
name: amq_streams_connect
36+
tasks_from: validate.yml
37+
38+
post_tasks:
39+
- name: "Ensures topics exist."
40+
ansible.builtin.include_role:
41+
name: amq_streams_broker
42+
tasks_from: topic/create.yml
43+
loop: "{{ amq_streams_broker_topics }}"
44+
loop_control:
45+
loop_var: topic
46+
vars:
47+
topic_name: "{{ topic.name }}"
48+
topic_partitions: "{{ topic.partitions }}"
49+
topic_replication_factor: "{{ topic.replication_factor }}"
50+
51+
- name: "Describe created topics."
52+
ansible.builtin.include_role:
53+
name: amq_streams_broker
54+
tasks_from: topic/describe.yml
55+
loop: "{{ amq_streams_broker_topics }}"
56+
loop_control:
57+
loop_var: topic
58+
vars:
59+
topic_name: "{{ topic.name }}"
60+
61+
- name: "Delete topics"
62+
ansible.builtin.include_role:
63+
name: amq_streams_broker
64+
tasks_from: topic/delete.yml
65+
loop: "{{ amq_streams_broker_topics }}"
66+
loop_control:
67+
loop_var: topic
68+
vars:
69+
topic_name: "{{ topic.name }}"
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
---
2+
driver:
3+
name: docker
4+
platforms:
5+
- name: instance
6+
image: registry.access.redhat.com/ubi9/ubi-init:latest
7+
command: "/usr/sbin/init"
8+
pre_build_image: true
9+
privileged: true
10+
groups:
11+
- brokers
12+
provisioner:
13+
name: ansible
14+
config_options:
15+
defaults:
16+
interpreter_python: auto_silent
17+
ssh_connection:
18+
pipelining: false
19+
playbooks:
20+
prepare: ../prepare.yml
21+
converge: converge.yml
22+
verify: verify.yml
23+
inventory:
24+
host_vars:
25+
localhost:
26+
ansible_python_interpreter: "{{ ansible_playbook_python }}"
27+
env:
28+
ANSIBLE_FORCE_COLOR: "true"
29+
verifier:
30+
name: ansible
31+
scenario:
32+
test_sequence:
33+
- cleanup
34+
- destroy
35+
- syntax
36+
- create
37+
- prepare
38+
- converge
39+
- idempotence
40+
- side_effect
41+
- verify
42+
- cleanup
43+
- destroy

molecule/amq_streams_kraft/roles

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
../../roles
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
---
2+
amq_streams_common_escalade_privilege_group_create: "{{ amq_streams_install_requires_become | default(true) }}"
3+
amq_streams_common_escalade_privilege_user_create: "{{ amq_streams_install_requires_become | default(true) }}"
4+
amq_streams_common_archive_extraction_requires_privilege_escalation: "{{ amq_streams_install_requires_become | default(true) }}"
5+
amq_streams_common_dependencies_require_priv: "{{ amq_streams_install_requires_become | default(true) }}"
6+
amq_streams_zookeeper_data_require_priv_escalation: "{{ amq_streams_install_requires_become | default(true) }}"
7+
amq_streams_zookeeper_restart_requires_priv_escalation: "{{ amq_streams_install_requires_become | default(true) }}"
8+
amq_streams_broker_tls_truststore_client_require_priv_escalation: "{{ amq_streams_install_requires_become | default(true) }}"
9+
amq_streams_broker_config_files_requires_privilege_escalation: "{{ amq_streams_install_requires_become | default(true) }}"
10+
amq_streams_cruise_control_path_to_capacity_file_require_priv_escalation: "{{ amq_streams_install_requires_become | default(true) }}"
11+
amq_streams_cruise_control_logfiles_requires_priv_escalation: "{{ amq_streams_install_requires_become | default(true) }}"
12+
amq_streams_connect_source_file_require_priv_escalation: "{{ amq_streams_install_requires_become | default(true) }}"
13+
amq_streams_kraft_priv_escalation: "{{ amq_streams_install_requires_become | default(true) }}"
14+
amq_streams_common_product_version: 4.1.1
15+
amq_streams_force_clean: true
16+
# 1. Run the Systemd Service as root
17+
amq_streams_broker_user: root
18+
amq_streams_broker_group: root
19+
20+
# 2. Run KRaft tasks as root
21+
amq_streams_kraft_user: root
22+
amq_streams_kraft_group: root
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
---
2+
- name: Verify
3+
hosts: all
4+
tasks:
5+
6+
- name: Populate service facts
7+
ansible.builtin.service_facts:
8+
9+
- name: Check broker service
10+
assert:
11+
that:
12+
- ansible_facts.services["amq_streams_broker.service"]["state"] == "running"
13+
- ansible_facts.services["amq_streams_broker.service"]["status"] == "enabled"
14+
15+
- name: Check controller service
16+
assert:
17+
that:
18+
- ansible_facts.services["amq_streams_controller.service"]["state"] == "running"
19+
- ansible_facts.services["amq_streams_controller.service"]["status"] == "enabled"

roles/amq_streams_broker/templates/server.properties.j2

Lines changed: 59 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,35 @@
2020
# See kafka.server.KafkaConfig for additional details and defaults
2121
#
2222

23+
# -----------------------------------------------------------------------------
24+
# MODE DETERMINATION (Added for Kafka 4.0+ KRaft Support)
25+
# -----------------------------------------------------------------------------
26+
{% set enable_kraft = amq_streams_enable_kraft | default(amq_streams_common_product_version is version('4.0.0', '>=')) | bool %}
27+
2328
############################# Server Basics #############################
2429

2530
# The id of the broker. This must be set to a unique integer for each broker.
31+
{% if enable_kraft %}
32+
# KRaft uses node.id instead of broker.id
33+
node.id={{ amq_streams_kraft_node_id | default(1) }}
34+
{% else %}
2635
broker.id={{ amq_streams_broker_broker_id | default(amq_streams_broker_inventory_group.index(inventory_hostname)) }}
36+
{% endif %}
37+
38+
############################# KRaft Settings (Kafka 4.0+) #############################
39+
{% if enable_kraft %}
40+
# The roles of this process. broker, controller, or both.
41+
process.roles={{ amq_streams_kraft_process_roles | default('broker,controller') }}
42+
43+
# The connect string for the controller quorum
44+
controller.quorum.voters={{ amq_streams_kraft_controller_quorum_voters }}
45+
46+
# Listener name used for the controller
47+
controller.listener.names={{ amq_streams_kraft_controller_listener_names | default('CONTROLLER') }}
48+
49+
# Listener name used for inter-broker communication
50+
inter.broker.listener.name={{ amq_streams_kraft_inter_broker_listener_name | default('PLAINTEXT') }}
51+
{% endif %}
2752

2853
############################# Socket Server Settings #############################
2954

@@ -33,40 +58,57 @@ broker.id={{ amq_streams_broker_broker_id | default(amq_streams_broker_inventory
3358
# listeners = listener_name://host_name:port
3459
# EXAMPLE:
3560
# listeners = PLAINTEXT://your.host.name:9092
36-
{% if amq_streams_broker_listeners is defined %}
61+
{% if enable_kraft %}
62+
# KRaft Mode Listeners (Requires Broker + Controller ports)
63+
listeners={{ amq_streams_kraft_listeners | join(",") }}
64+
{% else %}
65+
# Legacy ZK Mode Listeners
66+
{% if amq_streams_broker_listeners is defined %}
3767
listeners={{ amq_streams_broker_listeners | join(",") }}
38-
{% elif amq_streams_broker_listener_port is defined %}
68+
{% elif amq_streams_broker_listener_port is defined %}
3969
listeners=PLAINTEXT://:{{ amq_streams_broker_listener_port }}
40-
{% else %}
70+
{% else %}
4171
#listeners=PLAINTEXT://:9092
72+
{% endif %}
4273
{% endif %}
4374

44-
{% if amq_streams_broker_inter_broker_listener is defined %}
45-
# Name of listener used for communication between brokers
75+
{% if amq_streams_broker_inter_broker_listener is defined and not enable_kraft %}
76+
# Name of listener used for communication between brokers (Legacy ZK only)
4677
inter.broker.listener.name={{ amq_streams_broker_inter_broker_listener }}
4778
{% endif %}
4879

4980
# Listener name, hostname and port the broker will advertise to clients.
5081
# If not set, it uses the value for "listeners".
51-
{% if amq_streams_broker_advertised_listeners is defined %}
52-
advertised.listeners={{ amq_streams_broker_advertised_listeners | join(",") }}
82+
{% if enable_kraft %}
83+
# KRaft Mode Advertised Listeners (Broker port only)
84+
advertised.listeners={{ amq_streams_kraft_advertised_listeners | join(",") }}
5385
{% else %}
86+
# Legacy ZK Mode Advertised Listeners
87+
{% if amq_streams_broker_advertised_listeners is defined %}
88+
advertised.listeners={{ amq_streams_broker_advertised_listeners | join(",") }}
89+
{% else %}
5490
#advertised.listeners=PLAINTEXT://your.host.name:9092
91+
{% endif %}
5592
{% endif %}
5693

57-
{% if amq_streams_broker_auth_enabled and amq_streams_broker_auth_listeners is defined %}
94+
{% if enable_kraft %}
95+
# KRaft Mode Security Map (Must include Controller)
96+
listener.security.protocol.map=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
97+
{% else %}
98+
{% if amq_streams_broker_auth_enabled and amq_streams_broker_auth_listeners is defined %}
5899
# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
59100
listener.security.protocol.map={{ amq_streams_broker_auth_listeners | join(",") }}
60101

61102
# The list of SASL mechanisms enabled in the Kafka server
62103
sasl.enabled.mechanisms={{ amq_streams_broker_auth_sasl_mechanisms | join(",") }}
63-
{% if amq_streams_broker_inter_broker_auth_sasl_mechanisms is defined %}
104+
{% if amq_streams_broker_inter_broker_auth_sasl_mechanisms is defined %}
64105
# SASL mechanism used for inter-broker communication
65106
sasl.mechanism.inter.broker.protocol={{ amq_streams_broker_inter_broker_auth_sasl_mechanisms }}
66-
{% endif %}
67-
{% else %}
107+
{% endif %}
108+
{% else %}
68109
# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
69110
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
111+
{% endif %}
70112
{% endif %}
71113

72114
{% if amq_streams_broker_inter_broker_listener_auth is defined %}
@@ -105,7 +147,11 @@ socket.request.max.bytes={{ amq_streams_broker_socket_request_max_bytes }}
105147
############################# Log Basics #############################
106148

107149
# A comma separated list of directories under which to store log files
150+
{% if enable_kraft %}
151+
log.dirs={{ amq_streams_kraft_log_dirs }}
152+
{% else %}
108153
log.dirs={{ amq_streams_broker_data_dir }}
154+
{% endif %}
109155

110156
# The default number of log partitions per topic. More partitions allow greater
111157
# parallelism for consumption, but this will also result in more files across
@@ -162,7 +208,7 @@ log.retention.hours={{ amq_streams_broker_log_retention_hours }}
162208
log.retention.check.interval.ms={{ amq_streams_broker_log_retention_check_interval_ms }}
163209

164210
############################# Zookeeper #############################
165-
211+
{% if not enable_kraft %}
166212
# Zookeeper connection string (see zookeeper docs for details).
167213
# This is a comma separated host:port pairs, each corresponding to a zk
168214
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
@@ -173,6 +219,7 @@ zookeeper.connect={{ amq_streams_broker_zookeeper_host }}:{{ amq_streams_broker_
173219
# Timeout in ms for connecting to zookeeper
174220
zookeeper.connection.timeout.ms={{ amq_streams_broker_zookeeper_connection_timeout_ms }}
175221
zookeeper.session.timeout.ms={{ amq_streams_broker_zookeeper_session_timeout_ms }}
222+
{% endif %}
176223

177224
############################# Group Coordinator Settings #############################
178225

roles/amq_streams_common/tasks/systemd.yml

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -69,10 +69,30 @@
6969
enabled: yes
7070
state: started
7171

72-
- name: "Wait for service port {{ server_port }} to be available - (if provided)"
73-
ansible.builtin.wait_for:
74-
port: "{{ server_port }}"
75-
delay: "{{ delay_before_server_port_check | default(omit) }}"
76-
when:
77-
- skip_wait_for_server_port is defined and not skip_wait_for_server_port
78-
- server_port is defined and server_port != ''
72+
- block:
73+
- name: "Wait for service port {{ server_port }} to be available - (if provided)"
74+
ansible.builtin.wait_for:
75+
port: "{{ server_port }}"
76+
# Explicitly check the correct host. If Kafka binds to 0.0.0.0, 127.0.0.1 is fine.
77+
# If Kafka binds strictly to a private IP, change this to {{ ansible_host }}
78+
host: "{{ '127.0.0.1' if '0.0.0.0' in (amq_streams_kraft_listeners | default([])) else inventory_hostname }}"
79+
delay: "{{ delay_before_server_port_check | default(10) }}"
80+
timeout: 60 # Fail faster (1 min) instead of waiting 5 mins during debugging
81+
when:
82+
- skip_wait_for_server_port is defined and not skip_wait_for_server_port
83+
- server_port is defined and server_port != ''
84+
85+
rescue:
86+
- name: "FATAL: Service failed to start. Fetching logs..."
87+
ansible.builtin.shell: "journalctl -u {{ server_name }} -xe --no-pager | tail -n 50"
88+
register: kafka_crash_log
89+
changed_when: false
90+
ignore_errors: true
91+
92+
- name: "PRINT KAFKA CRASH LOGS"
93+
ansible.builtin.debug:
94+
msg: "{{ kafka_crash_log.stdout_lines }}"
95+
96+
- name: "Fail the playbook explicitly"
97+
ansible.builtin.fail:
98+
msg: "The Kafka service crashed immediately. See the logs printed above for the Java exception."

roles/amq_streams_kraft/README.md

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
# amq_streams_kraft
2+
3+
Ansible role to deploy **Apache Kafka in KRaft mode** (Kafka 4.x and Red Hat Streams for Apache Kafka 3.x).
4+
5+
## Features
6+
7+
- Installs Kafka binaries
8+
- Configures KRaft metadata quorum
9+
- Supports roles: broker, controller, **combined**
10+
- Formats metadata storage
11+
- Deploys `server.properties` and `controller.properties`
12+
- Deploys a `systemd` Kafka service
13+
- Validates active listener ports
14+
15+
## Role Variables
16+
17+
| Variable | Description | Default |
18+
|:---------|:------------|:--------|
19+
| `amq_streams_process_role` | Defines the KRaft node type | `"broker,controller"` |
20+
| `amq_streams_cluster_id` | Unique Kafka KRaft metadata cluster ID | `""` |
21+
| `amq_streams_controller_quorum_voters` | Comma-separated controller voter list in the format `<nodeId>@<host>:<port>` | `"1@localhost:9093"` |
22+
| `amq_streams_kraft_log_dirs` | Directory where Kafka stores data logs | `"/var/lib/kafka/kraft"` |
23+
| `amq_streams_kraft_metadata_log_dir` | Directory for KRaft metadata logs | `"/var/lib/kafka/kraft"` |
24+
25+
## Example Playbook
26+
27+
```yaml
28+
- hosts: kafka_nodes
29+
roles:
30+
- amq_streams_kraft
31+
```
32+
33+
## License
34+
35+
Apache License v2.0 or later
36+
37+
## Author Information
38+
39+
* [Ranabir Chakraborty](https://github.com/RanabirChakraborty)

0 commit comments

Comments
 (0)