From 262769255b188f832238031a88f7945940c10bf1 Mon Sep 17 00:00:00 2001 From: Razvan-Daniel Mihai <84674+razvan@users.noreply.github.com> Date: Thu, 6 Nov 2025 22:11:44 +0100 Subject: [PATCH 01/14] chore: move server properties from the command line to the configuration file --- rust/operator-binary/src/config/command.rs | 103 ++-------- .../src/config/node_id_hasher.rs | 5 + rust/operator-binary/src/crd/listener.rs | 6 +- rust/operator-binary/src/crd/mod.rs | 10 +- .../src/crd/role/controller.rs | 30 +-- rust/operator-binary/src/crd/role/mod.rs | 3 +- rust/operator-binary/src/kafka_controller.rs | 27 ++- .../operator-binary/src/resource/configmap.rs | 183 +++++++++++++++++- .../src/resource/statefulset.rs | 25 ++- 9 files changed, 250 insertions(+), 142 deletions(-) diff --git a/rust/operator-binary/src/config/command.rs b/rust/operator-binary/src/config/command.rs index 95281baa..674cd811 100644 --- a/rust/operator-binary/src/config/command.rs +++ b/rust/operator-binary/src/config/command.rs @@ -10,13 +10,8 @@ use crate::{ crd::{ KafkaPodDescriptor, STACKABLE_CONFIG_DIR, STACKABLE_KERBEROS_KRB5_PATH, STACKABLE_LISTENER_BOOTSTRAP_DIR, STACKABLE_LISTENER_BROKER_DIR, - listener::{KafkaListenerConfig, KafkaListenerName, node_address_cmd}, - role::{ - KAFKA_ADVERTISED_LISTENERS, KAFKA_CONTROLLER_QUORUM_BOOTSTRAP_SERVERS, - KAFKA_CONTROLLER_QUORUM_VOTERS, KAFKA_LISTENER_SECURITY_PROTOCOL_MAP, KAFKA_LISTENERS, - KAFKA_NODE_ID, KAFKA_NODE_ID_OFFSET, KafkaRole, broker::BROKER_PROPERTIES_FILE, - controller::CONTROLLER_PROPERTIES_FILE, - }, + listener::{KafkaListenerName, node_address_cmd}, + role::{KafkaRole, broker::BROKER_PROPERTIES_FILE, controller::CONTROLLER_PROPERTIES_FILE}, security::KafkaTlsSecurity, v1alpha1, }, @@ -28,8 +23,6 @@ pub fn broker_kafka_container_commands( kafka: &v1alpha1::KafkaCluster, cluster_id: &str, controller_descriptors: Vec, - kafka_listeners: &KafkaListenerConfig, - opa_connect_string: Option<&str>, kafka_security: &KafkaTlsSecurity, product_version: &str, ) -> String { @@ -51,7 +44,7 @@ pub fn broker_kafka_container_commands( true => format!("export KERBEROS_REALM=$(grep -oP 'default_realm = \\K.*' {STACKABLE_KERBEROS_KRB5_PATH})"), false => "".to_string(), }, - broker_start_command = broker_start_command(kafka, cluster_id, controller_descriptors, kafka_listeners, opa_connect_string, kafka_security, product_version), + broker_start_command = broker_start_command(kafka, cluster_id, controller_descriptors, kafka_security, product_version), } } @@ -59,18 +52,9 @@ fn broker_start_command( kafka: &v1alpha1::KafkaCluster, cluster_id: &str, controller_descriptors: Vec, - kafka_listeners: &KafkaListenerConfig, - opa_connect_string: Option<&str>, kafka_security: &KafkaTlsSecurity, product_version: &str, ) -> String { - let opa_config = match opa_connect_string { - None => "".to_string(), - Some(opa_connect_string) => { - format!(" --override \"opa.authorizer.url={opa_connect_string}\"") - } - }; - let jaas_config = match kafka_security.has_kerberos_enabled() { true => { formatdoc! {" @@ -89,50 +73,35 @@ fn broker_start_command( let client_port = kafka_security.client_port(); - // TODO: The properties file from the configmap is copied to the /tmp folder and appended with dynamic properties // This should be improved: // - mount emptyDir as readWriteConfig - // - use config-utils for proper replacements? - // - should we print the adapted properties file at startup? if kafka.is_controller_configured() { formatdoc! {" - export REPLICA_ID=$(echo \"$POD_NAME\" | grep -oE '[0-9]+$') + POD_INDEX=$(echo \"$POD_NAME\" | grep -oE '[0-9]+$') + export REPLICA_ID=$((POD_INDEX+NODE_ID_OFFSET)) + cp {config_dir}/{properties_file} /tmp/{properties_file} - echo \"{KAFKA_NODE_ID}=$((REPLICA_ID + ${KAFKA_NODE_ID_OFFSET}))\" >> /tmp/{properties_file} - echo \"{KAFKA_CONTROLLER_QUORUM_BOOTSTRAP_SERVERS}={bootstrap_servers}\" >> /tmp/{properties_file} - echo \"{KAFKA_LISTENERS}={listeners}\" >> /tmp/{properties_file} - echo \"{KAFKA_ADVERTISED_LISTENERS}={advertised_listeners}\" >> /tmp/{properties_file} - echo \"{KAFKA_LISTENER_SECURITY_PROTOCOL_MAP}={listener_security_protocol_map}\" >> /tmp/{properties_file} - echo \"{KAFKA_CONTROLLER_QUORUM_VOTERS}={controller_quorum_voters}\" >> /tmp/{properties_file} + config-utils template /tmp/{properties_file} bin/kafka-storage.sh format --cluster-id {cluster_id} --config /tmp/{properties_file} --ignore-formatted {initial_controller_command} - bin/kafka-server-start.sh /tmp/{properties_file} {opa_config}{jaas_config} & + bin/kafka-server-start.sh /tmp/{properties_file} {jaas_config} & ", config_dir = STACKABLE_CONFIG_DIR, properties_file = BROKER_PROPERTIES_FILE, - bootstrap_servers = to_bootstrap_servers(&controller_descriptors, client_port), - listeners = kafka_listeners.listeners(), - advertised_listeners = kafka_listeners.advertised_listeners(), - listener_security_protocol_map = kafka_listeners.listener_security_protocol_map(), - controller_quorum_voters = to_quorum_voters(&controller_descriptors, client_port), initial_controller_command = initial_controllers_command(&controller_descriptors, product_version, client_port), } } else { formatdoc! {" - bin/kafka-server-start.sh {config_dir}/{properties_file} \ - --override \"zookeeper.connect=$ZOOKEEPER\" \ - --override \"{KAFKA_LISTENERS}={listeners}\" \ - --override \"{KAFKA_ADVERTISED_LISTENERS}={advertised_listeners}\" \ - --override \"{KAFKA_LISTENER_SECURITY_PROTOCOL_MAP}={listener_security_protocol_map}\" \ - {opa_config} \ + cp {config_dir}/{properties_file} /tmp/{properties_file} + + config-utils template /tmp/{properties_file} + + bin/kafka-server-start.sh /tmp/{properties_file} \ {jaas_config} \ &", config_dir = STACKABLE_CONFIG_DIR, properties_file = BROKER_PROPERTIES_FILE, - listeners = kafka_listeners.listeners(), - advertised_listeners = kafka_listeners.advertised_listeners(), - listener_security_protocol_map = kafka_listeners.listener_security_protocol_map(), } } } @@ -182,7 +151,6 @@ wait_for_termination() pub fn controller_kafka_container_command( cluster_id: &str, controller_descriptors: Vec, - kafka_listeners: &KafkaListenerConfig, kafka_security: &KafkaTlsSecurity, product_version: &str, ) -> String { @@ -199,14 +167,12 @@ pub fn controller_kafka_container_command( prepare_signal_handlers containerdebug --output={STACKABLE_LOG_DIR}/containerdebug-state.json --loop & - export REPLICA_ID=$(echo \"$POD_NAME\" | grep -oE '[0-9]+$') + POD_INDEX=$(echo \"$POD_NAME\" | grep -oE '[0-9]+$') + export REPLICA_ID=$((POD_INDEX+NODE_ID_OFFSET)) + cp {config_dir}/{properties_file} /tmp/{properties_file} - echo \"{KAFKA_NODE_ID}=$((REPLICA_ID + ${KAFKA_NODE_ID_OFFSET}))\" >> /tmp/{properties_file} - echo \"{KAFKA_CONTROLLER_QUORUM_BOOTSTRAP_SERVERS}={bootstrap_servers}\" >> /tmp/{properties_file} - echo \"{KAFKA_LISTENERS}={listeners}\" >> /tmp/{properties_file} - echo \"{KAFKA_LISTENER_SECURITY_PROTOCOL_MAP}={listener_security_protocol_map}\" >> /tmp/{properties_file} - echo \"{KAFKA_CONTROLLER_QUORUM_VOTERS}={controller_quorum_voters}\" >> /tmp/{properties_file} + config-utils template /tmp/{properties_file} bin/kafka-storage.sh format --cluster-id {cluster_id} --config /tmp/{properties_file} --ignore-formatted {initial_controller_command} bin/kafka-server-start.sh /tmp/{properties_file} & @@ -217,29 +183,11 @@ pub fn controller_kafka_container_command( remove_vector_shutdown_file_command = remove_vector_shutdown_file_command(STACKABLE_LOG_DIR), config_dir = STACKABLE_CONFIG_DIR, properties_file = CONTROLLER_PROPERTIES_FILE, - bootstrap_servers = to_bootstrap_servers(&controller_descriptors, client_port), - listeners = to_listeners(client_port), - listener_security_protocol_map = to_listener_security_protocol_map(kafka_listeners), initial_controller_command = initial_controllers_command(&controller_descriptors, product_version, client_port), - controller_quorum_voters = to_quorum_voters(&controller_descriptors, client_port), create_vector_shutdown_file_command = create_vector_shutdown_file_command(STACKABLE_LOG_DIR) } } -fn to_listeners(port: u16) -> String { - // The environment variables are set in the statefulset of the controller - format!( - "{listener_name}://$POD_NAME.$ROLEGROUP_HEADLESS_SERVICE_NAME.$NAMESPACE.svc.$CLUSTER_DOMAIN:{port}", - listener_name = KafkaListenerName::Controller - ) -} - -fn to_listener_security_protocol_map(kafka_listeners: &KafkaListenerConfig) -> String { - kafka_listeners - .listener_security_protocol_map_for_listener(&KafkaListenerName::Controller) - .unwrap_or("".to_string()) -} - fn to_initial_controllers(controller_descriptors: &[KafkaPodDescriptor], port: u16) -> String { controller_descriptors .iter() @@ -248,23 +196,6 @@ fn to_initial_controllers(controller_descriptors: &[KafkaPodDescriptor], port: u .join(",") } -// TODO: This can be removed once 3.7.2 is removed. Used in command.rs. -fn to_quorum_voters(controller_descriptors: &[KafkaPodDescriptor], port: u16) -> String { - controller_descriptors - .iter() - .map(|desc| desc.as_quorum_voter(port)) - .collect::>() - .join(",") -} - -fn to_bootstrap_servers(controller_descriptors: &[KafkaPodDescriptor], port: u16) -> String { - controller_descriptors - .iter() - .map(|desc| format!("{fqdn}:{port}", fqdn = desc.fqdn())) - .collect::>() - .join(",") -} - fn initial_controllers_command( controller_descriptors: &[KafkaPodDescriptor], product_version: &str, diff --git a/rust/operator-binary/src/config/node_id_hasher.rs b/rust/operator-binary/src/config/node_id_hasher.rs index eebee090..eb4c7951 100644 --- a/rust/operator-binary/src/config/node_id_hasher.rs +++ b/rust/operator-binary/src/config/node_id_hasher.rs @@ -2,6 +2,11 @@ use stackable_operator::role_utils::RoleGroupRef; use crate::crd::v1alpha1::KafkaCluster; +/// The Kafka node.id needs to be unique across the Kafka cluster. +/// This function generates an integer that is stable for a given role group +/// regardless if broker or controllers. +/// This integer is then added to the pod index to compute the final node.id +/// TODO: this is dangerous. How high are the chances of these ranges overlapping? pub fn node_id_hash32_offset(rolegroup_ref: &RoleGroupRef) -> u32 { let hash = fnv_hash32(&format!( "{role}-{rolegroup}", diff --git a/rust/operator-binary/src/crd/listener.rs b/rust/operator-binary/src/crd/listener.rs index ed2b2100..66c9bbb6 100644 --- a/rust/operator-binary/src/crd/listener.rs +++ b/rust/operator-binary/src/crd/listener.rs @@ -331,11 +331,11 @@ pub fn get_kafka_listener_config( } pub fn node_address_cmd(directory: &str) -> String { - format!("$(cat {directory}/default-address/address)") + format!("${{file:UTF-8:{directory}/default-address/address}}") } pub fn node_port_cmd(directory: &str, port_name: &str) -> String { - format!("$(cat {directory}/default-address/ports/{port_name})") + format!("${{file:UTF-8:{directory}/default-address/ports/{port_name}}}") } pub fn pod_fqdn( @@ -344,7 +344,7 @@ pub fn pod_fqdn( cluster_info: &KubernetesClusterInfo, ) -> Result { Ok(format!( - "$POD_NAME.{sts_service_name}.{namespace}.svc.{cluster_domain}", + "${{env:POD_NAME}}.{sts_service_name}.{namespace}.svc.{cluster_domain}", namespace = kafka.namespace().context(ObjectHasNoNamespaceSnafu)?, cluster_domain = cluster_info.cluster_domain )) diff --git a/rust/operator-binary/src/crd/mod.rs b/rust/operator-binary/src/crd/mod.rs index 9748388c..259032cf 100644 --- a/rust/operator-binary/src/crd/mod.rs +++ b/rust/operator-binary/src/crd/mod.rs @@ -254,13 +254,14 @@ impl v1alpha1::KafkaCluster { }) } - /// List all pod descriptors of a provided role expected to form the cluster. + /// List pod descriptors for a given role. + /// If no role is provided, pod descriptors for all roles are listed. /// /// We try to predict the pods here rather than looking at the current cluster state in order to /// avoid instance churn. pub fn pod_descriptors( &self, - requested_kafka_role: &KafkaRole, + requested_kafka_role: Option<&KafkaRole>, cluster_info: &KubernetesClusterInfo, ) -> Result, Error> { let namespace = self.metadata.namespace.clone().context(NoNamespaceSnafu)?; @@ -290,10 +291,12 @@ impl v1alpha1::KafkaCluster { }; // only return descriptors for selected role - if current_role == *requested_kafka_role { + if requested_kafka_role.is_none() || ¤t_role == requested_kafka_role.unwrap() + { for replica in 0..replicas { pod_descriptors.push(KafkaPodDescriptor { namespace: namespace.clone(), + role: current_role.to_string(), role_group_service_name: rolegroup_ref .rolegroup_headless_service_name(), role_group_statefulset_name: rolegroup_ref.object_name(), @@ -348,6 +351,7 @@ pub struct KafkaPodDescriptor { replica: u16, cluster_domain: DomainName, node_id: u32, + pub role: String, } impl KafkaPodDescriptor { diff --git a/rust/operator-binary/src/crd/role/controller.rs b/rust/operator-binary/src/crd/role/controller.rs index 9be5464f..5b9513a5 100644 --- a/rust/operator-binary/src/crd/role/controller.rs +++ b/rust/operator-binary/src/crd/role/controller.rs @@ -15,11 +15,7 @@ use stackable_operator::{ use strum::{Display, EnumIter}; use crate::crd::{ - listener::KafkaListenerName, - role::{ - KAFKA_LOG_DIRS, KAFKA_PROCESS_ROLES, KafkaRole, - commons::{CommonConfig, Storage, StorageFragment}, - }, + role::commons::{CommonConfig, Storage, StorageFragment}, v1alpha1, }; @@ -121,29 +117,9 @@ impl Configuration for ControllerConfigFragment { &self, _resource: &Self::Configurable, _role_name: &str, - file: &str, + _file: &str, ) -> Result>, stackable_operator::product_config_utils::Error> { - let mut config = BTreeMap::new(); - - if file == CONTROLLER_PROPERTIES_FILE { - config.insert( - KAFKA_LOG_DIRS.to_string(), - Some("/stackable/data/kraft".to_string()), - ); - - // KRAFT - config.insert( - KAFKA_PROCESS_ROLES.to_string(), - Some(KafkaRole::Controller.to_string()), - ); - - config.insert( - "controller.listener.names".to_string(), - Some(KafkaListenerName::Controller.to_string()), - ); - } - - Ok(config) + Ok(BTreeMap::new()) } } diff --git a/rust/operator-binary/src/crd/role/mod.rs b/rust/operator-binary/src/crd/role/mod.rs index 47210ea4..1857f951 100644 --- a/rust/operator-binary/src/crd/role/mod.rs +++ b/rust/operator-binary/src/crd/role/mod.rs @@ -68,7 +68,8 @@ pub const KAFKA_CONTROLLER_QUORUM_BOOTSTRAP_SERVERS: &str = "controller.quorum.b /// Map of id/endpoint information for the set of voters in a comma-separated list of {id}@{host}:{port} entries. /// For example: 1@localhost:9092,2@localhost:9093,3@localhost:9094 -pub const KAFKA_CONTROLLER_QUORUM_VOTERS: &str = "controller.quorum.voters"; +/// TODO: maybe re-enable +// pub const KAFKA_CONTROLLER_QUORUM_VOTERS: &str = "controller.quorum.voters"; #[derive(Snafu, Debug)] pub enum Error { diff --git a/rust/operator-binary/src/kafka_controller.rs b/rust/operator-binary/src/kafka_controller.rs index fbce9c2a..1f1e8806 100644 --- a/rust/operator-binary/src/kafka_controller.rs +++ b/rust/operator-binary/src/kafka_controller.rs @@ -37,6 +37,7 @@ use crate::{ crd::{ self, APP_NAME, DOCKER_IMAGE_BASE_NAME, JVM_SECURITY_PROPERTIES_FILE, KafkaClusterStatus, OPERATOR_NAME, + listener::get_kafka_listener_config, role::{ AnyConfig, KafkaRole, broker::BROKER_PROPERTIES_FILE, controller::CONTROLLER_PROPERTIES_FILE, @@ -66,6 +67,14 @@ pub struct Ctx { #[strum_discriminants(derive(IntoStaticStr))] #[allow(clippy::enum_variant_names)] pub enum Error { + #[snafu(display("failed to build pod descriptors"))] + BuildPodDescriptors { source: crate::crd::Error }, + + #[snafu(display("invalid kafka listeners"))] + InvalidKafkaListeners { + source: crate::crd::listener::KafkaListenerError, + }, + #[snafu(display("cluster object defines no '{role}' role"))] MissingKafkaRole { source: crate::crd::Error, @@ -242,6 +251,8 @@ impl ReconcilerError for Error { Error::BuildConfigMap { .. } => None, Error::BuildService { .. } => None, Error::BuildListener { .. } => None, + Error::InvalidKafkaListeners { .. } => None, + Error::BuildPodDescriptors { .. } => None, } } } @@ -359,6 +370,18 @@ pub async fn reconcile_kafka( build_rolegroup_metrics_service(kafka, &resolved_product_image, &rolegroup_ref) .context(BuildServiceSnafu)?; + let kafka_listeners = get_kafka_listener_config( + kafka, + &kafka_security, + &rolegroup_ref, + &client.kubernetes_cluster_info, + ) + .context(InvalidKafkaListenersSnafu)?; + + let pod_descriptors = kafka + .pod_descriptors(None, &client.kubernetes_cluster_info) + .context(BuildPodDescriptorsSnafu)?; + let rg_configmap = build_rolegroup_config_map( kafka, &resolved_product_image, @@ -366,6 +389,9 @@ pub async fn reconcile_kafka( &rolegroup_ref, rolegroup_config, &merged_config, + &kafka_listeners, + &pod_descriptors, + opa_connect.as_deref(), ) .context(BuildConfigMapSnafu)?; @@ -376,7 +402,6 @@ pub async fn reconcile_kafka( &resolved_product_image, &rolegroup_ref, rolegroup_config, - opa_connect.as_deref(), &kafka_security, &merged_config, &rbac_sa, diff --git a/rust/operator-binary/src/resource/configmap.rs b/rust/operator-binary/src/resource/configmap.rs index 76368fef..d92d34cb 100644 --- a/rust/operator-binary/src/resource/configmap.rs +++ b/rust/operator-binary/src/resource/configmap.rs @@ -1,7 +1,10 @@ -use std::collections::{BTreeMap, HashMap}; +use std::{ + collections::{BTreeMap, HashMap}, + str::FromStr, +}; use product_config::{types::PropertyNameKind, writer::to_java_properties_string}; -use snafu::{ResultExt, Snafu}; +use snafu::{OptionExt, ResultExt, Snafu}; use stackable_operator::{ builder::{configmap::ConfigMapBuilder, meta::ObjectMetaBuilder}, commons::product_image_selection::ResolvedProductImage, @@ -10,7 +13,17 @@ use stackable_operator::{ }; use crate::{ - crd::{JVM_SECURITY_PROPERTIES_FILE, role::AnyConfig, security::KafkaTlsSecurity, v1alpha1}, + crd::{ + JVM_SECURITY_PROPERTIES_FILE, KafkaPodDescriptor, + listener::{KafkaListenerConfig, KafkaListenerName}, + role::{ + AnyConfig, KAFKA_ADVERTISED_LISTENERS, KAFKA_CONTROLLER_QUORUM_BOOTSTRAP_SERVERS, + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP, KAFKA_LISTENERS, KAFKA_LOG_DIRS, KAFKA_NODE_ID, + KAFKA_PROCESS_ROLES, KafkaRole, + }, + security::KafkaTlsSecurity, + v1alpha1, + }, kafka_controller::KAFKA_CONTROLLER_NAME, operations::graceful_shutdown::graceful_shutdown_config_properties, product_logging::extend_role_group_config_map, @@ -49,9 +62,19 @@ pub enum Error { source: product_config::writer::PropertiesWriterError, rolegroup: RoleGroupRef, }, + + #[snafu(display("no Kraft controllers found to build"))] + NoKraftControllersFound, + + #[snafu(display("unknown Kafka role [{name}]"))] + UnknownKafkaRole { + source: strum::ParseError, + name: String, + }, } /// The rolegroup [`ConfigMap`] configures the rolegroup based on the configuration given by the administrator +#[allow(clippy::too_many_arguments)] pub fn build_rolegroup_config_map( kafka: &v1alpha1::KafkaCluster, resolved_product_image: &ResolvedProductImage, @@ -59,13 +82,27 @@ pub fn build_rolegroup_config_map( rolegroup: &RoleGroupRef, rolegroup_config: &HashMap>, merged_config: &AnyConfig, + listener_config: &KafkaListenerConfig, + pod_descriptors: &[KafkaPodDescriptor], + opa_connect_string: Option<&str>, ) -> Result { let kafka_config_file_name = merged_config.config_file_name(); - let mut kafka_config = rolegroup_config - .get(&PropertyNameKind::File(kafka_config_file_name.to_string())) - .cloned() - .unwrap_or_default(); + let mut kafka_config = server_properties_file( + kafka.is_controller_configured(), + &rolegroup.role, + pod_descriptors, + listener_config, + opa_connect_string, + )?; + + // Need to call this to get configOverrides :( + kafka_config.extend( + rolegroup_config + .get(&PropertyNameKind::File(kafka_config_file_name.to_string())) + .cloned() + .unwrap_or_default(), + ); match merged_config { AnyConfig::Broker(_) => kafka_config.extend(kafka_security.broker_config_settings()), @@ -154,3 +191,135 @@ pub fn build_rolegroup_config_map( rolegroup: rolegroup.clone(), }) } + +fn server_properties_file( + kraft_mode: bool, + role: &str, + pod_descriptors: &[KafkaPodDescriptor], + listener_config: &KafkaListenerConfig, + opa_connect_string: Option<&str>, +) -> Result, Error> { + let kraft_controllers = kraft_controllers(pod_descriptors); + + let role = KafkaRole::from_str(role).context(UnknownKafkaRoleSnafu { + name: role.to_string(), + })?; + + match role { + KafkaRole::Controller => { + let kraft_controllers = kraft_controllers.context(NoKraftControllersFoundSnafu)?; + + Ok(BTreeMap::from([ + ( + KAFKA_LOG_DIRS.to_string(), + "/stackable/data/kraft".to_string(), + ), + (KAFKA_PROCESS_ROLES.to_string(), role.to_string()), + ( + "controller.listener.names".to_string(), + KafkaListenerName::Controller.to_string(), + ), + ( + KAFKA_NODE_ID.to_string(), + "${env:REPLICA_ID}".to_string(), + ), + ( + KAFKA_CONTROLLER_QUORUM_BOOTSTRAP_SERVERS.to_string(), + kraft_controllers.clone(), + ), + // TODO: figure this out + //(KAFKA_CONTROLLER_QUORUM_VOTERS.to_string(), + //kraft_controllers, + //), + ( + KAFKA_LISTENERS.to_string(), + "CONTROLLER://${env:POD_NAME}.${env:ROLEGROUP_HEADLESS_SERVICE_NAME}.${env:NAMESPACE}.svc.${env:CLUSTER_DOMAIN}:${env:KAFKA_CLIENT_PORT}".to_string(), + ), + ( + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP.to_string(), + listener_config + .listener_security_protocol_map_for_listener(&KafkaListenerName::Controller) + .unwrap_or("".to_string())), + ])) + } + KafkaRole::Broker => { + let mut result = BTreeMap::from([ + ( + KAFKA_LOG_DIRS.to_string(), + "/stackable/data/topicdata".to_string(), + ), + (KAFKA_LISTENERS.to_string(), listener_config.listeners()), + ( + KAFKA_ADVERTISED_LISTENERS.to_string(), + listener_config.advertised_listeners(), + ), + ( + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP.to_string(), + listener_config.listener_security_protocol_map(), + ), + ]); + + if kraft_mode { + let kraft_controllers = kraft_controllers.context(NoKraftControllersFoundSnafu)?; + + // Running in KRaft mode + result.extend([ + (KAFKA_NODE_ID.to_string(), "${env:REPLICA_ID}".to_string()), + ( + KAFKA_PROCESS_ROLES.to_string(), + KafkaRole::Broker.to_string(), + ), + ( + "controller.listener.names".to_string(), + KafkaListenerName::Controller.to_string(), + ), + ( + KAFKA_CONTROLLER_QUORUM_BOOTSTRAP_SERVERS.to_string(), + kraft_controllers.clone(), + ), + ]); + } else { + // Running with ZooKeeper enabled + result.extend([( + "zookeeper.connect".to_string(), + "${env:ZOOKEEPER}".to_string(), + )]); + } + + // Enable OPA authorization + if opa_connect_string.is_some() { + result.extend([ + ( + "authorizer.class.name".to_string(), + "org.openpolicyagent.kafka.OpaAuthorizer".to_string(), + ), + ( + "opa.authorizer.metrics.enabled".to_string(), + "true".to_string(), + ), + ( + "opa.authorizer.url".to_string(), + opa_connect_string.unwrap_or_default().to_string(), + ), + ]); + } + + Ok(result) + } + } +} + +fn kraft_controllers(pod_descriptors: &[KafkaPodDescriptor]) -> Option { + let result = pod_descriptors + .iter() + .filter(|pd| pd.role == KafkaRole::Controller.to_string()) + .map(|desc| format!("{fqdn}:${{env:KAFKA_CLIENT_PORT}}", fqdn = desc.fqdn())) + .collect::>() + .join(","); + + if result.is_empty() { + None + } else { + Some(result) + } +} diff --git a/rust/operator-binary/src/resource/statefulset.rs b/rust/operator-binary/src/resource/statefulset.rs index 67343518..0c52689b 100644 --- a/rust/operator-binary/src/resource/statefulset.rs +++ b/rust/operator-binary/src/resource/statefulset.rs @@ -52,7 +52,6 @@ use crate::{ LISTENER_BROKER_VOLUME_NAME, LOG_DIRS_VOLUME_NAME, METRICS_PORT, METRICS_PORT_NAME, STACKABLE_CONFIG_DIR, STACKABLE_DATA_DIR, STACKABLE_LISTENER_BOOTSTRAP_DIR, STACKABLE_LISTENER_BROKER_DIR, - listener::get_kafka_listener_config, role::{ AnyConfig, KAFKA_NODE_ID_OFFSET, KafkaRole, broker::BrokerContainer, controller::ControllerContainer, @@ -169,7 +168,6 @@ pub fn build_broker_rolegroup_statefulset( resolved_product_image: &ResolvedProductImage, rolegroup_ref: &RoleGroupRef, broker_config: &HashMap>, - opa_connect_string: Option<&str>, kafka_security: &KafkaTlsSecurity, merged_config: &AnyConfig, service_account: &ServiceAccount, @@ -285,10 +283,6 @@ pub fn build_broker_rolegroup_statefulset( ..EnvVar::default() }); - let kafka_listeners = - get_kafka_listener_config(kafka, kafka_security, rolegroup_ref, cluster_info) - .context(InvalidKafkaListenersSnafu)?; - let cluster_id = kafka.cluster_id().context(ClusterIdMissingSnafu)?; cb_kafka @@ -305,10 +299,8 @@ pub fn build_broker_rolegroup_statefulset( cluster_id, // we need controller pods kafka - .pod_descriptors(&KafkaRole::Controller, cluster_info) + .pod_descriptors(Some(&KafkaRole::Controller), cluster_info) .context(BuildPodDescriptorsSnafu)?, - &kafka_listeners, - opa_connect_string, kafka_security, &resolved_product_image.product_version, )]) @@ -337,6 +329,10 @@ pub fn build_broker_rolegroup_statefulset( KAFKA_NODE_ID_OFFSET, node_id_hash32_offset(rolegroup_ref).to_string(), ) + .add_env_var( + "KAFKA_CLIENT_PORT".to_string(), + kafka_security.client_port().to_string(), + ) .add_env_vars(env) .add_container_ports(container_ports(kafka_security)) .add_volume_mount(LOG_DIRS_VOLUME_NAME, STACKABLE_DATA_DIR) @@ -628,9 +624,11 @@ pub fn build_controller_rolegroup_statefulset( ..EnvVar::default() }); - let kafka_listeners = - get_kafka_listener_config(kafka, kafka_security, rolegroup_ref, cluster_info) - .context(InvalidKafkaListenersSnafu)?; + env.push(EnvVar { + name: "KAFKA_CLIENT_PORT".to_string(), + value: Some(kafka_security.client_port().to_string()), + ..EnvVar::default() + }); cb_kafka .image_from_product_image(resolved_product_image) @@ -644,9 +642,8 @@ pub fn build_controller_rolegroup_statefulset( .args(vec![controller_kafka_container_command( kafka.cluster_id().context(ClusterIdMissingSnafu)?, kafka - .pod_descriptors(kafka_role, cluster_info) + .pod_descriptors(Some(kafka_role), cluster_info) .context(BuildPodDescriptorsSnafu)?, - &kafka_listeners, kafka_security, &resolved_product_image.product_version, )]) From 3a1cba99a9b05bc2cc51897605a13c150b133a20 Mon Sep 17 00:00:00 2001 From: Razvan-Daniel Mihai <84674+razvan@users.noreply.github.com> Date: Fri, 7 Nov 2025 21:06:41 +0100 Subject: [PATCH 02/14] fix 3.7 deployments and tests --- rust/operator-binary/src/config/command.rs | 16 ++---- .../src/config/node_id_hasher.rs | 3 +- rust/operator-binary/src/crd/mod.rs | 9 +++- rust/operator-binary/src/crd/role/mod.rs | 3 +- rust/operator-binary/src/kafka_controller.rs | 6 ++- .../operator-binary/src/resource/configmap.rs | 53 +++++++++++++++---- .../src/resource/statefulset.rs | 9 ++-- 7 files changed, 70 insertions(+), 29 deletions(-) diff --git a/rust/operator-binary/src/config/command.rs b/rust/operator-binary/src/config/command.rs index 674cd811..2000940e 100644 --- a/rust/operator-binary/src/config/command.rs +++ b/rust/operator-binary/src/config/command.rs @@ -71,8 +71,6 @@ fn broker_start_command( false => "".to_string(), }; - let client_port = kafka_security.client_port(); - // This should be improved: // - mount emptyDir as readWriteConfig if kafka.is_controller_configured() { @@ -89,7 +87,7 @@ fn broker_start_command( ", config_dir = STACKABLE_CONFIG_DIR, properties_file = BROKER_PROPERTIES_FILE, - initial_controller_command = initial_controllers_command(&controller_descriptors, product_version, client_port), + initial_controller_command = initial_controllers_command(&controller_descriptors, product_version), } } else { formatdoc! {" @@ -151,11 +149,8 @@ wait_for_termination() pub fn controller_kafka_container_command( cluster_id: &str, controller_descriptors: Vec, - kafka_security: &KafkaTlsSecurity, product_version: &str, ) -> String { - let client_port = kafka_security.client_port(); - // TODO: The properties file from the configmap is copied to the /tmp folder and appended with dynamic properties // This should be improved: // - mount emptyDir as readWriteConfig @@ -183,15 +178,15 @@ pub fn controller_kafka_container_command( remove_vector_shutdown_file_command = remove_vector_shutdown_file_command(STACKABLE_LOG_DIR), config_dir = STACKABLE_CONFIG_DIR, properties_file = CONTROLLER_PROPERTIES_FILE, - initial_controller_command = initial_controllers_command(&controller_descriptors, product_version, client_port), + initial_controller_command = initial_controllers_command(&controller_descriptors, product_version), create_vector_shutdown_file_command = create_vector_shutdown_file_command(STACKABLE_LOG_DIR) } } -fn to_initial_controllers(controller_descriptors: &[KafkaPodDescriptor], port: u16) -> String { +fn to_initial_controllers(controller_descriptors: &[KafkaPodDescriptor]) -> String { controller_descriptors .iter() - .map(|desc| desc.as_voter(port)) + .map(|desc| desc.as_voter()) .collect::>() .join(",") } @@ -199,13 +194,12 @@ fn to_initial_controllers(controller_descriptors: &[KafkaPodDescriptor], port: u fn initial_controllers_command( controller_descriptors: &[KafkaPodDescriptor], product_version: &str, - client_port: u16, ) -> String { match product_version.starts_with("3.7") { true => "".to_string(), false => format!( "--initial-controllers {initial_controllers}", - initial_controllers = to_initial_controllers(controller_descriptors, client_port), + initial_controllers = to_initial_controllers(controller_descriptors), ), } } diff --git a/rust/operator-binary/src/config/node_id_hasher.rs b/rust/operator-binary/src/config/node_id_hasher.rs index eb4c7951..51690a0e 100644 --- a/rust/operator-binary/src/config/node_id_hasher.rs +++ b/rust/operator-binary/src/config/node_id_hasher.rs @@ -6,7 +6,8 @@ use crate::crd::v1alpha1::KafkaCluster; /// This function generates an integer that is stable for a given role group /// regardless if broker or controllers. /// This integer is then added to the pod index to compute the final node.id -/// TODO: this is dangerous. How high are the chances of these ranges overlapping? +/// The node.id is only set and used in Kraft mode. +/// Warning: this is not safe from collisions. pub fn node_id_hash32_offset(rolegroup_ref: &RoleGroupRef) -> u32 { let hash = fnv_hash32(&format!( "{role}-{rolegroup}", diff --git a/rust/operator-binary/src/crd/mod.rs b/rust/operator-binary/src/crd/mod.rs index 259032cf..fa2804d3 100644 --- a/rust/operator-binary/src/crd/mod.rs +++ b/rust/operator-binary/src/crd/mod.rs @@ -263,6 +263,7 @@ impl v1alpha1::KafkaCluster { &self, requested_kafka_role: Option<&KafkaRole>, cluster_info: &KubernetesClusterInfo, + client_port: u16, ) -> Result, Error> { let namespace = self.metadata.namespace.clone().context(NoNamespaceSnafu)?; let mut pod_descriptors = Vec::new(); @@ -303,6 +304,7 @@ impl v1alpha1::KafkaCluster { replica, cluster_domain: cluster_info.cluster_domain.clone(), node_id: node_id_hash_offset + u32::from(replica), + client_port, }); } } @@ -352,6 +354,7 @@ pub struct KafkaPodDescriptor { cluster_domain: DomainName, node_id: u32, pub role: String, + pub client_port: u16, } impl KafkaPodDescriptor { @@ -380,18 +383,20 @@ impl KafkaPodDescriptor { /// * controller-0 is the replica's host, /// * 1234 is the replica's port. // NOTE(@maltesander): Even though the used Uuid states to be type 4 it does not work... 0000000000-00000000000 works... - pub fn as_voter(&self, port: u16) -> String { + pub fn as_voter(&self) -> String { format!( "{node_id}@{fqdn}:{port}:0000000000-{node_id:0>11}", node_id = self.node_id, + port = self.client_port, fqdn = self.fqdn(), ) } - pub fn as_quorum_voter(&self, port: u16) -> String { + pub fn as_quorum_voter(&self) -> String { format!( "{node_id}@{fqdn}:{port}", node_id = self.node_id, + port = self.client_port, fqdn = self.fqdn(), ) } diff --git a/rust/operator-binary/src/crd/role/mod.rs b/rust/operator-binary/src/crd/role/mod.rs index 1857f951..47210ea4 100644 --- a/rust/operator-binary/src/crd/role/mod.rs +++ b/rust/operator-binary/src/crd/role/mod.rs @@ -68,8 +68,7 @@ pub const KAFKA_CONTROLLER_QUORUM_BOOTSTRAP_SERVERS: &str = "controller.quorum.b /// Map of id/endpoint information for the set of voters in a comma-separated list of {id}@{host}:{port} entries. /// For example: 1@localhost:9092,2@localhost:9093,3@localhost:9094 -/// TODO: maybe re-enable -// pub const KAFKA_CONTROLLER_QUORUM_VOTERS: &str = "controller.quorum.voters"; +pub const KAFKA_CONTROLLER_QUORUM_VOTERS: &str = "controller.quorum.voters"; #[derive(Snafu, Debug)] pub enum Error { diff --git a/rust/operator-binary/src/kafka_controller.rs b/rust/operator-binary/src/kafka_controller.rs index 1f1e8806..aebe52c3 100644 --- a/rust/operator-binary/src/kafka_controller.rs +++ b/rust/operator-binary/src/kafka_controller.rs @@ -379,7 +379,11 @@ pub async fn reconcile_kafka( .context(InvalidKafkaListenersSnafu)?; let pod_descriptors = kafka - .pod_descriptors(None, &client.kubernetes_cluster_info) + .pod_descriptors( + None, + &client.kubernetes_cluster_info, + kafka_security.client_port(), + ) .context(BuildPodDescriptorsSnafu)?; let rg_configmap = build_rolegroup_config_map( diff --git a/rust/operator-binary/src/resource/configmap.rs b/rust/operator-binary/src/resource/configmap.rs index d92d34cb..6d657593 100644 --- a/rust/operator-binary/src/resource/configmap.rs +++ b/rust/operator-binary/src/resource/configmap.rs @@ -18,8 +18,8 @@ use crate::{ listener::{KafkaListenerConfig, KafkaListenerName}, role::{ AnyConfig, KAFKA_ADVERTISED_LISTENERS, KAFKA_CONTROLLER_QUORUM_BOOTSTRAP_SERVERS, - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP, KAFKA_LISTENERS, KAFKA_LOG_DIRS, KAFKA_NODE_ID, - KAFKA_PROCESS_ROLES, KafkaRole, + KAFKA_CONTROLLER_QUORUM_VOTERS, KAFKA_LISTENER_SECURITY_PROTOCOL_MAP, KAFKA_LISTENERS, + KAFKA_LOG_DIRS, KAFKA_NODE_ID, KAFKA_PROCESS_ROLES, KafkaRole, }, security::KafkaTlsSecurity, v1alpha1, @@ -94,6 +94,7 @@ pub fn build_rolegroup_config_map( pod_descriptors, listener_config, opa_connect_string, + resolved_product_image.product_version.starts_with("3.7"), // needs_quorum_voters )?; // Need to call this to get configOverrides :( @@ -198,6 +199,7 @@ fn server_properties_file( pod_descriptors: &[KafkaPodDescriptor], listener_config: &KafkaListenerConfig, opa_connect_string: Option<&str>, + needs_quorum_voters: bool, ) -> Result, Error> { let kraft_controllers = kraft_controllers(pod_descriptors); @@ -209,7 +211,7 @@ fn server_properties_file( KafkaRole::Controller => { let kraft_controllers = kraft_controllers.context(NoKraftControllersFoundSnafu)?; - Ok(BTreeMap::from([ + let mut result = BTreeMap::from([ ( KAFKA_LOG_DIRS.to_string(), "/stackable/data/kraft".to_string(), @@ -227,10 +229,6 @@ fn server_properties_file( KAFKA_CONTROLLER_QUORUM_BOOTSTRAP_SERVERS.to_string(), kraft_controllers.clone(), ), - // TODO: figure this out - //(KAFKA_CONTROLLER_QUORUM_VOTERS.to_string(), - //kraft_controllers, - //), ( KAFKA_LISTENERS.to_string(), "CONTROLLER://${env:POD_NAME}.${env:ROLEGROUP_HEADLESS_SERVICE_NAME}.${env:NAMESPACE}.svc.${env:CLUSTER_DOMAIN}:${env:KAFKA_CLIENT_PORT}".to_string(), @@ -240,7 +238,16 @@ fn server_properties_file( listener_config .listener_security_protocol_map_for_listener(&KafkaListenerName::Controller) .unwrap_or("".to_string())), - ])) + ]); + + if needs_quorum_voters { + let kraft_voters = + kraft_voters(pod_descriptors).context(NoKraftControllersFoundSnafu)?; + + result.extend([(KAFKA_CONTROLLER_QUORUM_VOTERS.to_string(), kraft_voters)]); + } + + Ok(result) } KafkaRole::Broker => { let mut result = BTreeMap::from([ @@ -278,6 +285,13 @@ fn server_properties_file( kraft_controllers.clone(), ), ]); + + if needs_quorum_voters { + let kraft_voters = + kraft_voters(pod_descriptors).context(NoKraftControllersFoundSnafu)?; + + result.extend([(KAFKA_CONTROLLER_QUORUM_VOTERS.to_string(), kraft_voters)]); + } } else { // Running with ZooKeeper enabled result.extend([( @@ -313,7 +327,28 @@ fn kraft_controllers(pod_descriptors: &[KafkaPodDescriptor]) -> Option { let result = pod_descriptors .iter() .filter(|pd| pd.role == KafkaRole::Controller.to_string()) - .map(|desc| format!("{fqdn}:${{env:KAFKA_CLIENT_PORT}}", fqdn = desc.fqdn())) + .map(|desc| { + format!( + "{fqdn}:{client_port}", + fqdn = desc.fqdn(), + client_port = desc.client_port + ) + }) + .collect::>() + .join(","); + + if result.is_empty() { + None + } else { + Some(result) + } +} + +fn kraft_voters(pod_descriptors: &[KafkaPodDescriptor]) -> Option { + let result = pod_descriptors + .iter() + .filter(|pd| pd.role == KafkaRole::Controller.to_string()) + .map(|desc| desc.as_quorum_voter()) .collect::>() .join(","); diff --git a/rust/operator-binary/src/resource/statefulset.rs b/rust/operator-binary/src/resource/statefulset.rs index 0c52689b..89154dad 100644 --- a/rust/operator-binary/src/resource/statefulset.rs +++ b/rust/operator-binary/src/resource/statefulset.rs @@ -299,7 +299,11 @@ pub fn build_broker_rolegroup_statefulset( cluster_id, // we need controller pods kafka - .pod_descriptors(Some(&KafkaRole::Controller), cluster_info) + .pod_descriptors( + Some(&KafkaRole::Controller), + cluster_info, + kafka_security.client_port(), + ) .context(BuildPodDescriptorsSnafu)?, kafka_security, &resolved_product_image.product_version, @@ -642,9 +646,8 @@ pub fn build_controller_rolegroup_statefulset( .args(vec![controller_kafka_container_command( kafka.cluster_id().context(ClusterIdMissingSnafu)?, kafka - .pod_descriptors(Some(kafka_role), cluster_info) + .pod_descriptors(Some(kafka_role), cluster_info, kafka_security.client_port()) .context(BuildPodDescriptorsSnafu)?, - kafka_security, &resolved_product_image.product_version, )]) .add_env_var("PRE_STOP_CONTROLLER_SLEEP_SECONDS", "10") From 01b1a05748fb9cfa81ab70e1b6ad1b9325cd6dda Mon Sep 17 00:00:00 2001 From: Razvan-Daniel Mihai <84674+razvan@users.noreply.github.com> Date: Tue, 11 Nov 2025 12:55:43 +0100 Subject: [PATCH 03/14] remove controller listener from broker config --- rust/operator-binary/src/crd/security.rs | 25 ------------------------ 1 file changed, 25 deletions(-) diff --git a/rust/operator-binary/src/crd/security.rs b/rust/operator-binary/src/crd/security.rs index 6c482eb8..6b6234fc 100644 --- a/rust/operator-binary/src/crd/security.rs +++ b/rust/operator-binary/src/crd/security.rs @@ -647,31 +647,6 @@ impl KafkaTlsSecurity { KafkaListenerName::Internal.listener_ssl_truststore_type(), "PKCS12".to_string(), ); - // CONTROLLERS - config.insert( - KafkaListenerName::Controller.listener_ssl_keystore_location(), - format!("{}/keystore.p12", Self::STACKABLE_TLS_KAFKA_INTERNAL_DIR), - ); - config.insert( - KafkaListenerName::Controller.listener_ssl_keystore_password(), - Self::SSL_STORE_PASSWORD.to_string(), - ); - config.insert( - KafkaListenerName::Controller.listener_ssl_keystore_type(), - "PKCS12".to_string(), - ); - config.insert( - KafkaListenerName::Controller.listener_ssl_truststore_location(), - format!("{}/truststore.p12", Self::STACKABLE_TLS_KAFKA_INTERNAL_DIR), - ); - config.insert( - KafkaListenerName::Controller.listener_ssl_truststore_password(), - Self::SSL_STORE_PASSWORD.to_string(), - ); - config.insert( - KafkaListenerName::Controller.listener_ssl_truststore_type(), - "PKCS12".to_string(), - ); // client auth required config.insert( KafkaListenerName::Internal.listener_ssl_client_auth(), From 1ce85b895320e92f102e91630d42f16fe67bd4b5 Mon Sep 17 00:00:00 2001 From: Razvan-Daniel Mihai <84674+razvan@users.noreply.github.com> Date: Tue, 11 Nov 2025 14:29:34 +0100 Subject: [PATCH 04/14] move jaas properties from the cmd line to config file --- rust/operator-binary/src/config/command.rs | 46 ++++---------- rust/operator-binary/src/crd/listener.rs | 10 +++ rust/operator-binary/src/crd/security.rs | 16 +++-- rust/operator-binary/src/kerberos.rs | 2 +- .../operator-binary/src/resource/configmap.rs | 61 ++++++++++++++++++- 5 files changed, 92 insertions(+), 43 deletions(-) diff --git a/rust/operator-binary/src/config/command.rs b/rust/operator-binary/src/config/command.rs index 2000940e..b1b71635 100644 --- a/rust/operator-binary/src/config/command.rs +++ b/rust/operator-binary/src/config/command.rs @@ -9,9 +9,7 @@ use stackable_operator::{ use crate::{ crd::{ KafkaPodDescriptor, STACKABLE_CONFIG_DIR, STACKABLE_KERBEROS_KRB5_PATH, - STACKABLE_LISTENER_BOOTSTRAP_DIR, STACKABLE_LISTENER_BROKER_DIR, - listener::{KafkaListenerName, node_address_cmd}, - role::{KafkaRole, broker::BROKER_PROPERTIES_FILE, controller::CONTROLLER_PROPERTIES_FILE}, + role::{broker::BROKER_PROPERTIES_FILE, controller::CONTROLLER_PROPERTIES_FILE}, security::KafkaTlsSecurity, v1alpha1, }, @@ -44,7 +42,7 @@ pub fn broker_kafka_container_commands( true => format!("export KERBEROS_REALM=$(grep -oP 'default_realm = \\K.*' {STACKABLE_KERBEROS_KRB5_PATH})"), false => "".to_string(), }, - broker_start_command = broker_start_command(kafka, cluster_id, controller_descriptors, kafka_security, product_version), + broker_start_command = broker_start_command(kafka, cluster_id, controller_descriptors, product_version), } } @@ -52,38 +50,21 @@ fn broker_start_command( kafka: &v1alpha1::KafkaCluster, cluster_id: &str, controller_descriptors: Vec, - kafka_security: &KafkaTlsSecurity, product_version: &str, ) -> String { - let jaas_config = match kafka_security.has_kerberos_enabled() { - true => { - formatdoc! {" - --override \"{client_jaas_config}=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true isInitiator=false keyTab=\\\"/stackable/kerberos/keytab\\\" principal=\\\"{service_name}/{broker_address}@$KERBEROS_REALM\\\";\" \ - --override \"{bootstrap_jaas_config}=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true isInitiator=false keyTab=\\\"/stackable/kerberos/keytab\\\" principal=\\\"{service_name}/{bootstrap_address}@$KERBEROS_REALM\\\";\" - ", - client_jaas_config = KafkaListenerName::Client.listener_gssapi_sasl_jaas_config(), - bootstrap_jaas_config = KafkaListenerName::Bootstrap.listener_gssapi_sasl_jaas_config(), - service_name = KafkaRole::Broker.kerberos_service_name(), - broker_address = node_address_cmd(STACKABLE_LISTENER_BROKER_DIR), - bootstrap_address = node_address_cmd(STACKABLE_LISTENER_BOOTSTRAP_DIR), - } - } - false => "".to_string(), - }; - - // This should be improved: - // - mount emptyDir as readWriteConfig if kafka.is_controller_configured() { formatdoc! {" POD_INDEX=$(echo \"$POD_NAME\" | grep -oE '[0-9]+$') export REPLICA_ID=$((POD_INDEX+NODE_ID_OFFSET)) cp {config_dir}/{properties_file} /tmp/{properties_file} - config-utils template /tmp/{properties_file} + cp {config_dir}/jaas.properties /tmp/jaas.properties + config-utils template /tmp/jaas.properties + bin/kafka-storage.sh format --cluster-id {cluster_id} --config /tmp/{properties_file} --ignore-formatted {initial_controller_command} - bin/kafka-server-start.sh /tmp/{properties_file} {jaas_config} & + bin/kafka-server-start.sh /tmp/{properties_file} & ", config_dir = STACKABLE_CONFIG_DIR, properties_file = BROKER_PROPERTIES_FILE, @@ -92,12 +73,12 @@ fn broker_start_command( } else { formatdoc! {" cp {config_dir}/{properties_file} /tmp/{properties_file} - config-utils template /tmp/{properties_file} - bin/kafka-server-start.sh /tmp/{properties_file} \ - {jaas_config} \ - &", + cp {config_dir}/jaas.properties /tmp/jaas.properties + config-utils template /tmp/jaas.properties + + bin/kafka-server-start.sh /tmp/{properties_file} &", config_dir = STACKABLE_CONFIG_DIR, properties_file = BROKER_PROPERTIES_FILE, } @@ -112,7 +93,7 @@ fn broker_start_command( // The environment variable `PRE_STOP_CONTROLLER_SLEEP_SECONDS` delays the termination of the // controller processes to give the brokers more time to offload data and shutdown gracefully. // Kubernetes has a built in `pre-stop` hook feature that is not yet generally available on all platforms -// supported by the operator. +// supported by the operator.http://app.sl/ const BASH_TRAP_FUNCTIONS: &str = r#" prepare_signal_handlers() { @@ -151,11 +132,6 @@ pub fn controller_kafka_container_command( controller_descriptors: Vec, product_version: &str, ) -> String { - // TODO: The properties file from the configmap is copied to the /tmp folder and appended with dynamic properties - // This should be improved: - // - mount emptyDir as readWriteConfig - // - use config-utils for proper replacements? - // - should we print the adapted properties file at startup? formatdoc! {" {BASH_TRAP_FUNCTIONS} {remove_vector_shutdown_file_command} diff --git a/rust/operator-binary/src/crd/listener.rs b/rust/operator-binary/src/crd/listener.rs index 66c9bbb6..6daf7d67 100644 --- a/rust/operator-binary/src/crd/listener.rs +++ b/rust/operator-binary/src/crd/listener.rs @@ -100,12 +100,14 @@ impl KafkaListenerName { ) } + /* pub fn listener_gssapi_sasl_jaas_config(&self) -> String { format!( "listener.name.{listener_name}.gssapi.sasl.jaas.config", listener_name = self.to_string().to_lowercase() ) } + */ } #[derive(Debug)] @@ -330,6 +332,14 @@ pub fn get_kafka_listener_config( }) } +pub fn node_address_cmd_env(directory: &str) -> String { + format!("$(cat {directory}/default-address/address)") +} + +pub fn node_port_cmd_env(directory: &str, port_name: &str) -> String { + format!("$(cat {directory}/default-address/ports/{port_name})") +} + pub fn node_address_cmd(directory: &str) -> String { format!("${{file:UTF-8:{directory}/default-address/address}}") } diff --git a/rust/operator-binary/src/crd/security.rs b/rust/operator-binary/src/crd/security.rs index 6b6234fc..07574087 100644 --- a/rust/operator-binary/src/crd/security.rs +++ b/rust/operator-binary/src/crd/security.rs @@ -22,12 +22,12 @@ use stackable_operator::{ shared::time::Duration, }; -use super::listener::{KafkaListenerProtocol, node_port_cmd}; +use super::listener::KafkaListenerProtocol; use crate::crd::{ LISTENER_BOOTSTRAP_VOLUME_NAME, LISTENER_BROKER_VOLUME_NAME, STACKABLE_KERBEROS_KRB5_PATH, STACKABLE_LISTENER_BROKER_DIR, authentication::{self, ResolvedAuthenticationClasses}, - listener::{self, KafkaListenerName, node_address_cmd}, + listener::{self, KafkaListenerName, node_address_cmd_env, node_port_cmd_env}, role::KafkaRole, tls, v1alpha1, }; @@ -247,7 +247,6 @@ impl KafkaTlsSecurity { args.push("-L".to_string()); } else if self.has_kerberos_enabled() { let service_name = KafkaRole::Broker.kerberos_service_name(); - let broker_port = node_port_cmd(STACKABLE_LISTENER_BROKER_DIR, self.client_port_name()); // here we need to specify a shell so that variable substitution will work // see e.g. https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/V1ExecAction.md args.push("/bin/bash".to_string()); @@ -269,13 +268,20 @@ impl KafkaTlsSecurity { bash_args.push( format!( "export POD_BROKER_LISTENER_ADDRESS={};", - node_address_cmd(STACKABLE_LISTENER_BROKER_DIR) + node_address_cmd_env(STACKABLE_LISTENER_BROKER_DIR) + ) + .to_string(), + ); + bash_args.push( + format!( + "export POD_BROKER_LISTENER_PORT={};", + node_port_cmd_env(STACKABLE_LISTENER_BROKER_DIR, self.client_port_name()) ) .to_string(), ); bash_args.push("/stackable/kcat".to_string()); bash_args.push("-b".to_string()); - bash_args.push(format!("$POD_BROKER_LISTENER_ADDRESS:{broker_port}")); + bash_args.push("$POD_BROKER_LISTENER_ADDRESS:$POD_BROKER_LISTENER_PORT".to_string()); bash_args.extend(Self::kcat_client_sasl_ssl( Self::STACKABLE_TLS_KCAT_DIR, service_name, diff --git a/rust/operator-binary/src/kerberos.rs b/rust/operator-binary/src/kerberos.rs index e22de94a..dbde82be 100644 --- a/rust/operator-binary/src/kerberos.rs +++ b/rust/operator-binary/src/kerberos.rs @@ -61,7 +61,7 @@ pub fn add_kerberos_pod_config( cb.add_env_var("KRB5_CONFIG", STACKABLE_KERBEROS_KRB5_PATH); cb.add_env_var( "KAFKA_OPTS", - format!("-Djava.security.krb5.conf={STACKABLE_KERBEROS_KRB5_PATH}",), + format!("-Djava.security.auth.login.config=/tmp/jaas.properties -Djava.security.krb5.conf={STACKABLE_KERBEROS_KRB5_PATH}",), ); } } diff --git a/rust/operator-binary/src/resource/configmap.rs b/rust/operator-binary/src/resource/configmap.rs index 6d657593..b631d6e9 100644 --- a/rust/operator-binary/src/resource/configmap.rs +++ b/rust/operator-binary/src/resource/configmap.rs @@ -3,6 +3,7 @@ use std::{ str::FromStr, }; +use indoc::formatdoc; use product_config::{types::PropertyNameKind, writer::to_java_properties_string}; use snafu::{OptionExt, ResultExt, Snafu}; use stackable_operator::{ @@ -14,8 +15,9 @@ use stackable_operator::{ use crate::{ crd::{ - JVM_SECURITY_PROPERTIES_FILE, KafkaPodDescriptor, - listener::{KafkaListenerConfig, KafkaListenerName}, + JVM_SECURITY_PROPERTIES_FILE, KafkaPodDescriptor, STACKABLE_LISTENER_BOOTSTRAP_DIR, + STACKABLE_LISTENER_BROKER_DIR, + listener::{KafkaListenerConfig, KafkaListenerName, node_address_cmd}, role::{ AnyConfig, KAFKA_ADVERTISED_LISTENERS, KAFKA_CONTROLLER_QUORUM_BOOTSTRAP_SERVERS, KAFKA_CONTROLLER_QUORUM_VOTERS, KAFKA_LISTENER_SECURITY_PROTOCOL_MAP, KAFKA_LISTENERS, @@ -71,6 +73,9 @@ pub enum Error { source: strum::ParseError, name: String, }, + + #[snafu(display("failed to build jaas configuration file for {}", rolegroup))] + BuildJaasConfig { rolegroup: String }, } /// The rolegroup [`ConfigMap`] configures the rolegroup based on the configuration given by the administrator @@ -174,6 +179,14 @@ pub fn build_rolegroup_config_map( .with_context(|_| JvmSecurityPopertiesSnafu { rolegroup: rolegroup.role_group.clone(), })?, + ) + // This file contains the JAAS configuration for Kerberos authentication + // It has the ".properties" extension but is not a Java properties file. + // It is processed by `config-utils` to substitute "env:" and "file:" variables + // and this tool currently doesn't support the JAAS login configuration format. + .add_data( + "jaas.properties", + jaas_config_file(kafka_security.has_kerberos_enabled()), ); tracing::debug!(?kafka_config, "Applied kafka config"); @@ -358,3 +371,47 @@ fn kraft_voters(pod_descriptors: &[KafkaPodDescriptor]) -> Option { Some(result) } } + +// Generate JAAS configuration file for Kerberos authentication +// or an empty string if Kerberos is not enabled. +// See https://docs.oracle.com/javase/8/docs/technotes/guides/security/jgss/tutorials/LoginConfigFile.html +fn jaas_config_file(is_kerberos_enabled: bool) -> String { + match is_kerberos_enabled { + false => String::new(), + true => formatdoc! {" + bootstrap.KafkaServer {{ + com.sun.security.auth.module.Krb5LoginModule required + useKeyTab=true + storeKey=true + isInitiator=false + keyTab=\"/stackable/kerberos/keytab\" + principal=\"kafka/{bootstrap_address}@${{env:KERBEROS_REALM}}\"; + }}; + + client.KafkaServer {{ + com.sun.security.auth.module.Krb5LoginModule required + useKeyTab=true + storeKey=true + isInitiator=false + keyTab=\"/stackable/kerberos/keytab\" + principal=\"kafka/{broker_address}@${{env:KERBEROS_REALM}}\"; + }}; + + ", + bootstrap_address = node_address_cmd(STACKABLE_LISTENER_BOOTSTRAP_DIR), + broker_address = node_address_cmd(STACKABLE_LISTENER_BROKER_DIR), + }, + } +} +/* + KafkaClient {{ + com.sun.security.auth.module.Krb5LoginModule required + useKeyTab=true + storeKey=true + isInitiator=false + keyTab=\"/stackable/kerberos/keytab\" + principal=\"kafka/{node_address}@${{env:KERBEROS_REALM}}\"; + }}; + + bootstrap_address = node_address_cmd(STACKABLE_LISTENER_BOOTSTRAP_DIR), +*/ From d02eebfce7bfbbc3d486e0f597a07806d6580167 Mon Sep 17 00:00:00 2001 From: Razvan-Daniel Mihai <84674+razvan@users.noreply.github.com> Date: Tue, 11 Nov 2025 14:33:25 +0100 Subject: [PATCH 05/14] remove comment --- rust/operator-binary/src/resource/configmap.rs | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/rust/operator-binary/src/resource/configmap.rs b/rust/operator-binary/src/resource/configmap.rs index b631d6e9..ce3dd253 100644 --- a/rust/operator-binary/src/resource/configmap.rs +++ b/rust/operator-binary/src/resource/configmap.rs @@ -403,15 +403,3 @@ fn jaas_config_file(is_kerberos_enabled: bool) -> String { }, } } -/* - KafkaClient {{ - com.sun.security.auth.module.Krb5LoginModule required - useKeyTab=true - storeKey=true - isInitiator=false - keyTab=\"/stackable/kerberos/keytab\" - principal=\"kafka/{node_address}@${{env:KERBEROS_REALM}}\"; - }}; - - bootstrap_address = node_address_cmd(STACKABLE_LISTENER_BOOTSTRAP_DIR), -*/ From 1f3579a76b0ad8935f82a059005e0eb039f32638 Mon Sep 17 00:00:00 2001 From: Razvan-Daniel Mihai <84674+razvan@users.noreply.github.com> Date: Tue, 11 Nov 2025 14:37:10 +0100 Subject: [PATCH 06/14] update changelog --- CHANGELOG.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index a7951bb6..4e062966 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file. ## [Unreleased] +### Changed + +- Refactor: move server configuration properties from the command line to configuration files. ([#911]). + +[#911]: https://github.com/stackabletech/kafka-operator/pull/911 + ## [25.11.0] - 2025-11-07 ## [25.11.0-rc1] - 2025-11-06 From 85f9c32015c57f5ca5b43f8aa428a2f59eb28f11 Mon Sep 17 00:00:00 2001 From: Razvan-Daniel Mihai <84674+razvan@users.noreply.github.com> Date: Tue, 11 Nov 2025 14:55:08 +0100 Subject: [PATCH 07/14] remove typo --- rust/operator-binary/src/config/command.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/operator-binary/src/config/command.rs b/rust/operator-binary/src/config/command.rs index b1b71635..b2f31e8a 100644 --- a/rust/operator-binary/src/config/command.rs +++ b/rust/operator-binary/src/config/command.rs @@ -93,7 +93,7 @@ fn broker_start_command( // The environment variable `PRE_STOP_CONTROLLER_SLEEP_SECONDS` delays the termination of the // controller processes to give the brokers more time to offload data and shutdown gracefully. // Kubernetes has a built in `pre-stop` hook feature that is not yet generally available on all platforms -// supported by the operator.http://app.sl/ +// supported by the operator. const BASH_TRAP_FUNCTIONS: &str = r#" prepare_signal_handlers() { From 8f3a06a4b68b44db1c4259f9881115a0183ea523 Mon Sep 17 00:00:00 2001 From: Razvan-Daniel Mihai <84674+razvan@users.noreply.github.com> Date: Tue, 11 Nov 2025 15:06:42 +0100 Subject: [PATCH 08/14] small tweaks --- rust/operator-binary/src/crd/mod.rs | 8 +++----- rust/operator-binary/src/resource/configmap.rs | 1 + 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/rust/operator-binary/src/crd/mod.rs b/rust/operator-binary/src/crd/mod.rs index fa2804d3..6b3e0474 100644 --- a/rust/operator-binary/src/crd/mod.rs +++ b/rust/operator-binary/src/crd/mod.rs @@ -254,9 +254,8 @@ impl v1alpha1::KafkaCluster { }) } - /// List pod descriptors for a given role. - /// If no role is provided, pod descriptors for all roles are listed. - /// + /// List pod descriptors for a given role and all it's groups. + /// If no role is provided, pod descriptors for all roles (and all groups) are listed. /// We try to predict the pods here rather than looking at the current cluster state in order to /// avoid instance churn. pub fn pod_descriptors( @@ -292,8 +291,7 @@ impl v1alpha1::KafkaCluster { }; // only return descriptors for selected role - if requested_kafka_role.is_none() || ¤t_role == requested_kafka_role.unwrap() - { + if let Some(current_role) = requested_kafka_role { for replica in 0..replicas { pod_descriptors.push(KafkaPodDescriptor { namespace: namespace.clone(), diff --git a/rust/operator-binary/src/resource/configmap.rs b/rust/operator-binary/src/resource/configmap.rs index ce3dd253..2e7de40f 100644 --- a/rust/operator-binary/src/resource/configmap.rs +++ b/rust/operator-binary/src/resource/configmap.rs @@ -206,6 +206,7 @@ pub fn build_rolegroup_config_map( }) } +// Generate the content of both server.properties and controller.properties files. fn server_properties_file( kraft_mode: bool, role: &str, From 5c7eda0de3f30d53e29198fb4cf34b4119605634 Mon Sep 17 00:00:00 2001 From: Razvan-Daniel Mihai <84674+razvan@users.noreply.github.com> Date: Tue, 11 Nov 2025 17:32:03 +0100 Subject: [PATCH 09/14] revert brainfuck --- rust/operator-binary/src/crd/mod.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/rust/operator-binary/src/crd/mod.rs b/rust/operator-binary/src/crd/mod.rs index 6b3e0474..1ee7ba9f 100644 --- a/rust/operator-binary/src/crd/mod.rs +++ b/rust/operator-binary/src/crd/mod.rs @@ -290,8 +290,9 @@ impl v1alpha1::KafkaCluster { } }; - // only return descriptors for selected role - if let Some(current_role) = requested_kafka_role { + // If no specific role is requested, or the current role matches the requested one, add pod descriptors + if requested_kafka_role.is_none() || ¤t_role == requested_kafka_role.unwrap() + { for replica in 0..replicas { pod_descriptors.push(KafkaPodDescriptor { namespace: namespace.clone(), From 47abc7ea38df67aa68cc9fe3ce8f871910be3a25 Mon Sep 17 00:00:00 2001 From: Razvan-Daniel Mihai <84674+razvan@users.noreply.github.com> Date: Wed, 12 Nov 2025 09:31:01 +0100 Subject: [PATCH 10/14] Revert "remove controller listener from broker config" This reverts commit 01b1a05748fb9cfa81ab70e1b6ad1b9325cd6dda. --- rust/operator-binary/src/crd/security.rs | 25 ++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/rust/operator-binary/src/crd/security.rs b/rust/operator-binary/src/crd/security.rs index 07574087..325f95fc 100644 --- a/rust/operator-binary/src/crd/security.rs +++ b/rust/operator-binary/src/crd/security.rs @@ -653,6 +653,31 @@ impl KafkaTlsSecurity { KafkaListenerName::Internal.listener_ssl_truststore_type(), "PKCS12".to_string(), ); + // CONTROLLERS + config.insert( + KafkaListenerName::Controller.listener_ssl_keystore_location(), + format!("{}/keystore.p12", Self::STACKABLE_TLS_KAFKA_INTERNAL_DIR), + ); + config.insert( + KafkaListenerName::Controller.listener_ssl_keystore_password(), + Self::SSL_STORE_PASSWORD.to_string(), + ); + config.insert( + KafkaListenerName::Controller.listener_ssl_keystore_type(), + "PKCS12".to_string(), + ); + config.insert( + KafkaListenerName::Controller.listener_ssl_truststore_location(), + format!("{}/truststore.p12", Self::STACKABLE_TLS_KAFKA_INTERNAL_DIR), + ); + config.insert( + KafkaListenerName::Controller.listener_ssl_truststore_password(), + Self::SSL_STORE_PASSWORD.to_string(), + ); + config.insert( + KafkaListenerName::Controller.listener_ssl_truststore_type(), + "PKCS12".to_string(), + ); // client auth required config.insert( KafkaListenerName::Internal.listener_ssl_client_auth(), From 04d0ef33dbd4c0fcf5f723a3df3b64e2b7b6d6d4 Mon Sep 17 00:00:00 2001 From: Razvan-Daniel Mihai <84674+razvan@users.noreply.github.com> Date: Wed, 12 Nov 2025 14:10:41 +0100 Subject: [PATCH 11/14] Apply suggestions from code review Co-authored-by: Malte Sander --- rust/operator-binary/src/crd/mod.rs | 2 +- rust/operator-binary/src/resource/configmap.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/rust/operator-binary/src/crd/mod.rs b/rust/operator-binary/src/crd/mod.rs index 1ee7ba9f..951e13d3 100644 --- a/rust/operator-binary/src/crd/mod.rs +++ b/rust/operator-binary/src/crd/mod.rs @@ -254,7 +254,7 @@ impl v1alpha1::KafkaCluster { }) } - /// List pod descriptors for a given role and all it's groups. + /// List pod descriptors for a given role and all its rolegroups. /// If no role is provided, pod descriptors for all roles (and all groups) are listed. /// We try to predict the pods here rather than looking at the current cluster state in order to /// avoid instance churn. diff --git a/rust/operator-binary/src/resource/configmap.rs b/rust/operator-binary/src/resource/configmap.rs index 2e7de40f..8e3fe8da 100644 --- a/rust/operator-binary/src/resource/configmap.rs +++ b/rust/operator-binary/src/resource/configmap.rs @@ -74,7 +74,7 @@ pub enum Error { name: String, }, - #[snafu(display("failed to build jaas configuration file for {}", rolegroup))] + #[snafu(display("failed to build jaas configuration file for {rolegroup}"))] BuildJaasConfig { rolegroup: String }, } @@ -206,7 +206,7 @@ pub fn build_rolegroup_config_map( }) } -// Generate the content of both server.properties and controller.properties files. +// Generate the content of both broker.properties and controller.properties files. fn server_properties_file( kraft_mode: bool, role: &str, From 59c9f3f0965b53ffcb60b2c77d3954c276160e43 Mon Sep 17 00:00:00 2001 From: Razvan-Daniel Mihai <84674+razvan@users.noreply.github.com> Date: Wed, 12 Nov 2025 14:14:19 +0100 Subject: [PATCH 12/14] Update rust/operator-binary/src/crd/mod.rs Co-authored-by: Malte Sander --- rust/operator-binary/src/crd/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/operator-binary/src/crd/mod.rs b/rust/operator-binary/src/crd/mod.rs index 951e13d3..cef80193 100644 --- a/rust/operator-binary/src/crd/mod.rs +++ b/rust/operator-binary/src/crd/mod.rs @@ -291,7 +291,7 @@ impl v1alpha1::KafkaCluster { }; // If no specific role is requested, or the current role matches the requested one, add pod descriptors - if requested_kafka_role.is_none() || ¤t_role == requested_kafka_role.unwrap() + if requested_kafka_role.is_none() || Some(¤t_role) == requested_kafka_role { { for replica in 0..replicas { pod_descriptors.push(KafkaPodDescriptor { From 0a814d8f6e210af305b342174905915ea21a4a6e Mon Sep 17 00:00:00 2001 From: Razvan-Daniel Mihai <84674+razvan@users.noreply.github.com> Date: Wed, 12 Nov 2025 14:19:17 +0100 Subject: [PATCH 13/14] remove commented out code --- rust/operator-binary/src/crd/listener.rs | 9 --------- rust/operator-binary/src/crd/mod.rs | 1 - 2 files changed, 10 deletions(-) diff --git a/rust/operator-binary/src/crd/listener.rs b/rust/operator-binary/src/crd/listener.rs index 6daf7d67..12b35d62 100644 --- a/rust/operator-binary/src/crd/listener.rs +++ b/rust/operator-binary/src/crd/listener.rs @@ -99,15 +99,6 @@ impl KafkaListenerName { listener_name = self.to_string().to_lowercase() ) } - - /* - pub fn listener_gssapi_sasl_jaas_config(&self) -> String { - format!( - "listener.name.{listener_name}.gssapi.sasl.jaas.config", - listener_name = self.to_string().to_lowercase() - ) - } - */ } #[derive(Debug)] diff --git a/rust/operator-binary/src/crd/mod.rs b/rust/operator-binary/src/crd/mod.rs index cef80193..e8ea4852 100644 --- a/rust/operator-binary/src/crd/mod.rs +++ b/rust/operator-binary/src/crd/mod.rs @@ -292,7 +292,6 @@ impl v1alpha1::KafkaCluster { // If no specific role is requested, or the current role matches the requested one, add pod descriptors if requested_kafka_role.is_none() || Some(¤t_role) == requested_kafka_role { - { for replica in 0..replicas { pod_descriptors.push(KafkaPodDescriptor { namespace: namespace.clone(), From 5830389c54d0951fdd29021d08e7be8a57a4f2b2 Mon Sep 17 00:00:00 2001 From: Razvan-Daniel Mihai <84674+razvan@users.noreply.github.com> Date: Wed, 12 Nov 2025 14:40:30 +0100 Subject: [PATCH 14/14] rearange config map properties --- rust/operator-binary/src/crd/role/broker.rs | 45 ++----------------- .../operator-binary/src/resource/configmap.rs | 16 +++---- 2 files changed, 12 insertions(+), 49 deletions(-) diff --git a/rust/operator-binary/src/crd/role/broker.rs b/rust/operator-binary/src/crd/role/broker.rs index 837ddd80..00d614b9 100644 --- a/rust/operator-binary/src/crd/role/broker.rs +++ b/rust/operator-binary/src/crd/role/broker.rs @@ -15,11 +15,7 @@ use stackable_operator::{ use strum::{Display, EnumIter}; use crate::crd::{ - listener::KafkaListenerName, - role::{ - KAFKA_LOG_DIRS, KAFKA_PROCESS_ROLES, KafkaRole, - commons::{CommonConfig, Storage, StorageFragment}, - }, + role::commons::{CommonConfig, Storage, StorageFragment}, v1alpha1, }; @@ -129,44 +125,11 @@ impl Configuration for BrokerConfigFragment { fn compute_files( &self, - resource: &Self::Configurable, + _resource: &Self::Configurable, _role_name: &str, - file: &str, + _file: &str, ) -> Result>, stackable_operator::product_config_utils::Error> { - let mut config = BTreeMap::new(); - - if file == BROKER_PROPERTIES_FILE { - config.insert( - KAFKA_LOG_DIRS.to_string(), - Some("/stackable/data/topicdata".to_string()), - ); - - // KRAFT - if resource.is_controller_configured() { - config.insert( - KAFKA_PROCESS_ROLES.to_string(), - Some(KafkaRole::Broker.to_string()), - ); - - config.insert( - "controller.listener.names".to_string(), - Some(KafkaListenerName::Controller.to_string()), - ); - } - // OPA - if resource.spec.cluster_config.authorization.opa.is_some() { - config.insert( - "authorizer.class.name".to_string(), - Some("org.openpolicyagent.kafka.OpaAuthorizer".to_string()), - ); - config.insert( - "opa.authorizer.metrics.enabled".to_string(), - Some("true".to_string()), - ); - } - } - - Ok(config) + Ok(BTreeMap::new()) } } diff --git a/rust/operator-binary/src/resource/configmap.rs b/rust/operator-binary/src/resource/configmap.rs index 8e3fe8da..ded83c59 100644 --- a/rust/operator-binary/src/resource/configmap.rs +++ b/rust/operator-binary/src/resource/configmap.rs @@ -102,14 +102,6 @@ pub fn build_rolegroup_config_map( resolved_product_image.product_version.starts_with("3.7"), // needs_quorum_voters )?; - // Need to call this to get configOverrides :( - kafka_config.extend( - rolegroup_config - .get(&PropertyNameKind::File(kafka_config_file_name.to_string())) - .cloned() - .unwrap_or_default(), - ); - match merged_config { AnyConfig::Broker(_) => kafka_config.extend(kafka_security.broker_config_settings()), AnyConfig::Controller(_) => { @@ -119,6 +111,14 @@ pub fn build_rolegroup_config_map( kafka_config.extend(graceful_shutdown_config_properties()); + // Need to call this to get configOverrides :( + kafka_config.extend( + rolegroup_config + .get(&PropertyNameKind::File(kafka_config_file_name.to_string())) + .cloned() + .unwrap_or_default(), + ); + let kafka_config = kafka_config .into_iter() .map(|(k, v)| (k, Some(v)))