Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file.

## [Unreleased]

### Changed

- Refactor: move server configuration properties from the command line to configuration files. ([#911]).

[#911]: https://github.com/stackabletech/kafka-operator/pull/911

## [25.11.0] - 2025-11-07

## [25.11.0-rc1] - 2025-11-06
Expand Down
149 changes: 25 additions & 124 deletions rust/operator-binary/src/config/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,7 @@ use stackable_operator::{
use crate::{
crd::{
KafkaPodDescriptor, STACKABLE_CONFIG_DIR, STACKABLE_KERBEROS_KRB5_PATH,
STACKABLE_LISTENER_BOOTSTRAP_DIR, STACKABLE_LISTENER_BROKER_DIR,
listener::{KafkaListenerConfig, KafkaListenerName, node_address_cmd},
role::{
KAFKA_ADVERTISED_LISTENERS, KAFKA_CONTROLLER_QUORUM_BOOTSTRAP_SERVERS,
KAFKA_CONTROLLER_QUORUM_VOTERS, KAFKA_LISTENER_SECURITY_PROTOCOL_MAP, KAFKA_LISTENERS,
KAFKA_NODE_ID, KAFKA_NODE_ID_OFFSET, KafkaRole, broker::BROKER_PROPERTIES_FILE,
controller::CONTROLLER_PROPERTIES_FILE,
},
role::{broker::BROKER_PROPERTIES_FILE, controller::CONTROLLER_PROPERTIES_FILE},
security::KafkaTlsSecurity,
v1alpha1,
},
Expand All @@ -28,8 +21,6 @@ pub fn broker_kafka_container_commands(
kafka: &v1alpha1::KafkaCluster,
cluster_id: &str,
controller_descriptors: Vec<KafkaPodDescriptor>,
kafka_listeners: &KafkaListenerConfig,
opa_connect_string: Option<&str>,
kafka_security: &KafkaTlsSecurity,
product_version: &str,
) -> String {
Expand All @@ -51,88 +42,45 @@ pub fn broker_kafka_container_commands(
true => format!("export KERBEROS_REALM=$(grep -oP 'default_realm = \\K.*' {STACKABLE_KERBEROS_KRB5_PATH})"),
false => "".to_string(),
},
broker_start_command = broker_start_command(kafka, cluster_id, controller_descriptors, kafka_listeners, opa_connect_string, kafka_security, product_version),
broker_start_command = broker_start_command(kafka, cluster_id, controller_descriptors, product_version),
}
}

fn broker_start_command(
kafka: &v1alpha1::KafkaCluster,
cluster_id: &str,
controller_descriptors: Vec<KafkaPodDescriptor>,
kafka_listeners: &KafkaListenerConfig,
opa_connect_string: Option<&str>,
kafka_security: &KafkaTlsSecurity,
product_version: &str,
) -> String {
let opa_config = match opa_connect_string {
None => "".to_string(),
Some(opa_connect_string) => {
format!(" --override \"opa.authorizer.url={opa_connect_string}\"")
}
};

let jaas_config = match kafka_security.has_kerberos_enabled() {
true => {
formatdoc! {"
--override \"{client_jaas_config}=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true isInitiator=false keyTab=\\\"/stackable/kerberos/keytab\\\" principal=\\\"{service_name}/{broker_address}@$KERBEROS_REALM\\\";\" \
--override \"{bootstrap_jaas_config}=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true isInitiator=false keyTab=\\\"/stackable/kerberos/keytab\\\" principal=\\\"{service_name}/{bootstrap_address}@$KERBEROS_REALM\\\";\"
",
client_jaas_config = KafkaListenerName::Client.listener_gssapi_sasl_jaas_config(),
bootstrap_jaas_config = KafkaListenerName::Bootstrap.listener_gssapi_sasl_jaas_config(),
service_name = KafkaRole::Broker.kerberos_service_name(),
broker_address = node_address_cmd(STACKABLE_LISTENER_BROKER_DIR),
bootstrap_address = node_address_cmd(STACKABLE_LISTENER_BOOTSTRAP_DIR),
}
}
false => "".to_string(),
};

let client_port = kafka_security.client_port();

// TODO: The properties file from the configmap is copied to the /tmp folder and appended with dynamic properties
// This should be improved:
// - mount emptyDir as readWriteConfig
// - use config-utils for proper replacements?
// - should we print the adapted properties file at startup?
if kafka.is_controller_configured() {
formatdoc! {"
export REPLICA_ID=$(echo \"$POD_NAME\" | grep -oE '[0-9]+$')
POD_INDEX=$(echo \"$POD_NAME\" | grep -oE '[0-9]+$')
export REPLICA_ID=$((POD_INDEX+NODE_ID_OFFSET))

cp {config_dir}/{properties_file} /tmp/{properties_file}
config-utils template /tmp/{properties_file}

echo \"{KAFKA_NODE_ID}=$((REPLICA_ID + ${KAFKA_NODE_ID_OFFSET}))\" >> /tmp/{properties_file}
echo \"{KAFKA_CONTROLLER_QUORUM_BOOTSTRAP_SERVERS}={bootstrap_servers}\" >> /tmp/{properties_file}
echo \"{KAFKA_LISTENERS}={listeners}\" >> /tmp/{properties_file}
echo \"{KAFKA_ADVERTISED_LISTENERS}={advertised_listeners}\" >> /tmp/{properties_file}
echo \"{KAFKA_LISTENER_SECURITY_PROTOCOL_MAP}={listener_security_protocol_map}\" >> /tmp/{properties_file}
echo \"{KAFKA_CONTROLLER_QUORUM_VOTERS}={controller_quorum_voters}\" >> /tmp/{properties_file}
cp {config_dir}/jaas.properties /tmp/jaas.properties
config-utils template /tmp/jaas.properties

bin/kafka-storage.sh format --cluster-id {cluster_id} --config /tmp/{properties_file} --ignore-formatted {initial_controller_command}
bin/kafka-server-start.sh /tmp/{properties_file} {opa_config}{jaas_config} &
bin/kafka-server-start.sh /tmp/{properties_file} &
",
config_dir = STACKABLE_CONFIG_DIR,
properties_file = BROKER_PROPERTIES_FILE,
bootstrap_servers = to_bootstrap_servers(&controller_descriptors, client_port),
listeners = kafka_listeners.listeners(),
advertised_listeners = kafka_listeners.advertised_listeners(),
listener_security_protocol_map = kafka_listeners.listener_security_protocol_map(),
controller_quorum_voters = to_quorum_voters(&controller_descriptors, client_port),
initial_controller_command = initial_controllers_command(&controller_descriptors, product_version, client_port),
initial_controller_command = initial_controllers_command(&controller_descriptors, product_version),
}
} else {
formatdoc! {"
bin/kafka-server-start.sh {config_dir}/{properties_file} \
--override \"zookeeper.connect=$ZOOKEEPER\" \
--override \"{KAFKA_LISTENERS}={listeners}\" \
--override \"{KAFKA_ADVERTISED_LISTENERS}={advertised_listeners}\" \
--override \"{KAFKA_LISTENER_SECURITY_PROTOCOL_MAP}={listener_security_protocol_map}\" \
{opa_config} \
{jaas_config} \
&",
cp {config_dir}/{properties_file} /tmp/{properties_file}
config-utils template /tmp/{properties_file}

cp {config_dir}/jaas.properties /tmp/jaas.properties
config-utils template /tmp/jaas.properties

bin/kafka-server-start.sh /tmp/{properties_file} &",
config_dir = STACKABLE_CONFIG_DIR,
properties_file = BROKER_PROPERTIES_FILE,
listeners = kafka_listeners.listeners(),
advertised_listeners = kafka_listeners.advertised_listeners(),
listener_security_protocol_map = kafka_listeners.listener_security_protocol_map(),
}
}
}
Expand Down Expand Up @@ -182,31 +130,20 @@ wait_for_termination()
pub fn controller_kafka_container_command(
cluster_id: &str,
controller_descriptors: Vec<KafkaPodDescriptor>,
kafka_listeners: &KafkaListenerConfig,
kafka_security: &KafkaTlsSecurity,
product_version: &str,
) -> String {
let client_port = kafka_security.client_port();

// TODO: The properties file from the configmap is copied to the /tmp folder and appended with dynamic properties
// This should be improved:
// - mount emptyDir as readWriteConfig
// - use config-utils for proper replacements?
// - should we print the adapted properties file at startup?
formatdoc! {"
{BASH_TRAP_FUNCTIONS}
{remove_vector_shutdown_file_command}
prepare_signal_handlers
containerdebug --output={STACKABLE_LOG_DIR}/containerdebug-state.json --loop &

export REPLICA_ID=$(echo \"$POD_NAME\" | grep -oE '[0-9]+$')
POD_INDEX=$(echo \"$POD_NAME\" | grep -oE '[0-9]+$')
export REPLICA_ID=$((POD_INDEX+NODE_ID_OFFSET))

cp {config_dir}/{properties_file} /tmp/{properties_file}

echo \"{KAFKA_NODE_ID}=$((REPLICA_ID + ${KAFKA_NODE_ID_OFFSET}))\" >> /tmp/{properties_file}
echo \"{KAFKA_CONTROLLER_QUORUM_BOOTSTRAP_SERVERS}={bootstrap_servers}\" >> /tmp/{properties_file}
echo \"{KAFKA_LISTENERS}={listeners}\" >> /tmp/{properties_file}
echo \"{KAFKA_LISTENER_SECURITY_PROTOCOL_MAP}={listener_security_protocol_map}\" >> /tmp/{properties_file}
echo \"{KAFKA_CONTROLLER_QUORUM_VOTERS}={controller_quorum_voters}\" >> /tmp/{properties_file}
config-utils template /tmp/{properties_file}

bin/kafka-storage.sh format --cluster-id {cluster_id} --config /tmp/{properties_file} --ignore-formatted {initial_controller_command}
bin/kafka-server-start.sh /tmp/{properties_file} &
Expand All @@ -217,64 +154,28 @@ pub fn controller_kafka_container_command(
remove_vector_shutdown_file_command = remove_vector_shutdown_file_command(STACKABLE_LOG_DIR),
config_dir = STACKABLE_CONFIG_DIR,
properties_file = CONTROLLER_PROPERTIES_FILE,
bootstrap_servers = to_bootstrap_servers(&controller_descriptors, client_port),
listeners = to_listeners(client_port),
listener_security_protocol_map = to_listener_security_protocol_map(kafka_listeners),
initial_controller_command = initial_controllers_command(&controller_descriptors, product_version, client_port),
controller_quorum_voters = to_quorum_voters(&controller_descriptors, client_port),
initial_controller_command = initial_controllers_command(&controller_descriptors, product_version),
create_vector_shutdown_file_command = create_vector_shutdown_file_command(STACKABLE_LOG_DIR)
}
}

fn to_listeners(port: u16) -> String {
// The environment variables are set in the statefulset of the controller
format!(
"{listener_name}://$POD_NAME.$ROLEGROUP_HEADLESS_SERVICE_NAME.$NAMESPACE.svc.$CLUSTER_DOMAIN:{port}",
listener_name = KafkaListenerName::Controller
)
}

fn to_listener_security_protocol_map(kafka_listeners: &KafkaListenerConfig) -> String {
kafka_listeners
.listener_security_protocol_map_for_listener(&KafkaListenerName::Controller)
.unwrap_or("".to_string())
}

fn to_initial_controllers(controller_descriptors: &[KafkaPodDescriptor], port: u16) -> String {
controller_descriptors
.iter()
.map(|desc| desc.as_voter(port))
.collect::<Vec<String>>()
.join(",")
}

// TODO: This can be removed once 3.7.2 is removed. Used in command.rs.
fn to_quorum_voters(controller_descriptors: &[KafkaPodDescriptor], port: u16) -> String {
controller_descriptors
.iter()
.map(|desc| desc.as_quorum_voter(port))
.collect::<Vec<String>>()
.join(",")
}

fn to_bootstrap_servers(controller_descriptors: &[KafkaPodDescriptor], port: u16) -> String {
fn to_initial_controllers(controller_descriptors: &[KafkaPodDescriptor]) -> String {
controller_descriptors
.iter()
.map(|desc| format!("{fqdn}:{port}", fqdn = desc.fqdn()))
.map(|desc| desc.as_voter())
.collect::<Vec<String>>()
.join(",")
}

fn initial_controllers_command(
controller_descriptors: &[KafkaPodDescriptor],
product_version: &str,
client_port: u16,
) -> String {
match product_version.starts_with("3.7") {
true => "".to_string(),
false => format!(
"--initial-controllers {initial_controllers}",
initial_controllers = to_initial_controllers(controller_descriptors, client_port),
initial_controllers = to_initial_controllers(controller_descriptors),
),
}
}
6 changes: 6 additions & 0 deletions rust/operator-binary/src/config/node_id_hasher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@ use stackable_operator::role_utils::RoleGroupRef;

use crate::crd::v1alpha1::KafkaCluster;

/// The Kafka node.id needs to be unique across the Kafka cluster.
/// This function generates an integer that is stable for a given role group
/// regardless if broker or controllers.
/// This integer is then added to the pod index to compute the final node.id
/// The node.id is only set and used in Kraft mode.
/// Warning: this is not safe from collisions.
pub fn node_id_hash32_offset(rolegroup_ref: &RoleGroupRef<KafkaCluster>) -> u32 {
let hash = fnv_hash32(&format!(
"{role}-{rolegroup}",
Expand Down
21 changes: 11 additions & 10 deletions rust/operator-binary/src/crd/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,6 @@ impl KafkaListenerName {
listener_name = self.to_string().to_lowercase()
)
}

pub fn listener_gssapi_sasl_jaas_config(&self) -> String {
format!(
"listener.name.{listener_name}.gssapi.sasl.jaas.config",
listener_name = self.to_string().to_lowercase()
)
}
}

#[derive(Debug)]
Expand Down Expand Up @@ -330,21 +323,29 @@ pub fn get_kafka_listener_config(
})
}

pub fn node_address_cmd(directory: &str) -> String {
pub fn node_address_cmd_env(directory: &str) -> String {
format!("$(cat {directory}/default-address/address)")
}

pub fn node_port_cmd(directory: &str, port_name: &str) -> String {
pub fn node_port_cmd_env(directory: &str, port_name: &str) -> String {
format!("$(cat {directory}/default-address/ports/{port_name})")
}

pub fn node_address_cmd(directory: &str) -> String {
format!("${{file:UTF-8:{directory}/default-address/address}}")
}

pub fn node_port_cmd(directory: &str, port_name: &str) -> String {
format!("${{file:UTF-8:{directory}/default-address/ports/{port_name}}}")
}

pub fn pod_fqdn(
kafka: &v1alpha1::KafkaCluster,
sts_service_name: &str,
cluster_info: &KubernetesClusterInfo,
) -> Result<String, KafkaListenerError> {
Ok(format!(
"$POD_NAME.{sts_service_name}.{namespace}.svc.{cluster_domain}",
"${{env:POD_NAME}}.{sts_service_name}.{namespace}.svc.{cluster_domain}",
namespace = kafka.namespace().context(ObjectHasNoNamespaceSnafu)?,
cluster_domain = cluster_info.cluster_domain
))
Expand Down
21 changes: 14 additions & 7 deletions rust/operator-binary/src/crd/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,14 +254,15 @@ impl v1alpha1::KafkaCluster {
})
}

/// List all pod descriptors of a provided role expected to form the cluster.
///
/// List pod descriptors for a given role and all its rolegroups.
/// If no role is provided, pod descriptors for all roles (and all groups) are listed.
/// We try to predict the pods here rather than looking at the current cluster state in order to
/// avoid instance churn.
pub fn pod_descriptors(
&self,
requested_kafka_role: &KafkaRole,
requested_kafka_role: Option<&KafkaRole>,
cluster_info: &KubernetesClusterInfo,
client_port: u16,
) -> Result<Vec<KafkaPodDescriptor>, Error> {
let namespace = self.metadata.namespace.clone().context(NoNamespaceSnafu)?;
let mut pod_descriptors = Vec::new();
Expand Down Expand Up @@ -289,17 +290,19 @@ impl v1alpha1::KafkaCluster {
}
};

// only return descriptors for selected role
if current_role == *requested_kafka_role {
// If no specific role is requested, or the current role matches the requested one, add pod descriptors
if requested_kafka_role.is_none() || Some(&current_role) == requested_kafka_role {
for replica in 0..replicas {
pod_descriptors.push(KafkaPodDescriptor {
namespace: namespace.clone(),
role: current_role.to_string(),
role_group_service_name: rolegroup_ref
.rolegroup_headless_service_name(),
role_group_statefulset_name: rolegroup_ref.object_name(),
replica,
cluster_domain: cluster_info.cluster_domain.clone(),
node_id: node_id_hash_offset + u32::from(replica),
client_port,
});
}
}
Expand Down Expand Up @@ -348,6 +351,8 @@ pub struct KafkaPodDescriptor {
replica: u16,
cluster_domain: DomainName,
node_id: u32,
pub role: String,
pub client_port: u16,
}

impl KafkaPodDescriptor {
Expand Down Expand Up @@ -376,18 +381,20 @@ impl KafkaPodDescriptor {
/// * controller-0 is the replica's host,
/// * 1234 is the replica's port.
// NOTE(@maltesander): Even though the used Uuid states to be type 4 it does not work... 0000000000-00000000000 works...
pub fn as_voter(&self, port: u16) -> String {
pub fn as_voter(&self) -> String {
format!(
"{node_id}@{fqdn}:{port}:0000000000-{node_id:0>11}",
node_id = self.node_id,
port = self.client_port,
fqdn = self.fqdn(),
)
}

pub fn as_quorum_voter(&self, port: u16) -> String {
pub fn as_quorum_voter(&self) -> String {
format!(
"{node_id}@{fqdn}:{port}",
node_id = self.node_id,
port = self.client_port,
fqdn = self.fqdn(),
)
}
Expand Down
Loading