diff --git a/CHANGELOG.md b/CHANGELOG.md index a7951bb6..4e062966 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file. ## [Unreleased] +### Changed + +- Refactor: move server configuration properties from the command line to configuration files. ([#911]). + +[#911]: https://github.com/stackabletech/kafka-operator/pull/911 + ## [25.11.0] - 2025-11-07 ## [25.11.0-rc1] - 2025-11-06 diff --git a/rust/operator-binary/src/config/command.rs b/rust/operator-binary/src/config/command.rs index 95281baa..b2f31e8a 100644 --- a/rust/operator-binary/src/config/command.rs +++ b/rust/operator-binary/src/config/command.rs @@ -9,14 +9,7 @@ use stackable_operator::{ use crate::{ crd::{ KafkaPodDescriptor, STACKABLE_CONFIG_DIR, STACKABLE_KERBEROS_KRB5_PATH, - STACKABLE_LISTENER_BOOTSTRAP_DIR, STACKABLE_LISTENER_BROKER_DIR, - listener::{KafkaListenerConfig, KafkaListenerName, node_address_cmd}, - role::{ - KAFKA_ADVERTISED_LISTENERS, KAFKA_CONTROLLER_QUORUM_BOOTSTRAP_SERVERS, - KAFKA_CONTROLLER_QUORUM_VOTERS, KAFKA_LISTENER_SECURITY_PROTOCOL_MAP, KAFKA_LISTENERS, - KAFKA_NODE_ID, KAFKA_NODE_ID_OFFSET, KafkaRole, broker::BROKER_PROPERTIES_FILE, - controller::CONTROLLER_PROPERTIES_FILE, - }, + role::{broker::BROKER_PROPERTIES_FILE, controller::CONTROLLER_PROPERTIES_FILE}, security::KafkaTlsSecurity, v1alpha1, }, @@ -28,8 +21,6 @@ pub fn broker_kafka_container_commands( kafka: &v1alpha1::KafkaCluster, cluster_id: &str, controller_descriptors: Vec, - kafka_listeners: &KafkaListenerConfig, - opa_connect_string: Option<&str>, kafka_security: &KafkaTlsSecurity, product_version: &str, ) -> String { @@ -51,7 +42,7 @@ pub fn broker_kafka_container_commands( true => format!("export KERBEROS_REALM=$(grep -oP 'default_realm = \\K.*' {STACKABLE_KERBEROS_KRB5_PATH})"), false => "".to_string(), }, - broker_start_command = broker_start_command(kafka, cluster_id, controller_descriptors, kafka_listeners, opa_connect_string, kafka_security, product_version), + broker_start_command = broker_start_command(kafka, cluster_id, controller_descriptors, product_version), } } @@ -59,80 +50,37 @@ fn broker_start_command( kafka: &v1alpha1::KafkaCluster, cluster_id: &str, controller_descriptors: Vec, - kafka_listeners: &KafkaListenerConfig, - opa_connect_string: Option<&str>, - kafka_security: &KafkaTlsSecurity, product_version: &str, ) -> String { - let opa_config = match opa_connect_string { - None => "".to_string(), - Some(opa_connect_string) => { - format!(" --override \"opa.authorizer.url={opa_connect_string}\"") - } - }; - - let jaas_config = match kafka_security.has_kerberos_enabled() { - true => { - formatdoc! {" - --override \"{client_jaas_config}=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true isInitiator=false keyTab=\\\"/stackable/kerberos/keytab\\\" principal=\\\"{service_name}/{broker_address}@$KERBEROS_REALM\\\";\" \ - --override \"{bootstrap_jaas_config}=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true isInitiator=false keyTab=\\\"/stackable/kerberos/keytab\\\" principal=\\\"{service_name}/{bootstrap_address}@$KERBEROS_REALM\\\";\" - ", - client_jaas_config = KafkaListenerName::Client.listener_gssapi_sasl_jaas_config(), - bootstrap_jaas_config = KafkaListenerName::Bootstrap.listener_gssapi_sasl_jaas_config(), - service_name = KafkaRole::Broker.kerberos_service_name(), - broker_address = node_address_cmd(STACKABLE_LISTENER_BROKER_DIR), - bootstrap_address = node_address_cmd(STACKABLE_LISTENER_BOOTSTRAP_DIR), - } - } - false => "".to_string(), - }; - - let client_port = kafka_security.client_port(); - - // TODO: The properties file from the configmap is copied to the /tmp folder and appended with dynamic properties - // This should be improved: - // - mount emptyDir as readWriteConfig - // - use config-utils for proper replacements? - // - should we print the adapted properties file at startup? if kafka.is_controller_configured() { formatdoc! {" - export REPLICA_ID=$(echo \"$POD_NAME\" | grep -oE '[0-9]+$') + POD_INDEX=$(echo \"$POD_NAME\" | grep -oE '[0-9]+$') + export REPLICA_ID=$((POD_INDEX+NODE_ID_OFFSET)) + cp {config_dir}/{properties_file} /tmp/{properties_file} + config-utils template /tmp/{properties_file} - echo \"{KAFKA_NODE_ID}=$((REPLICA_ID + ${KAFKA_NODE_ID_OFFSET}))\" >> /tmp/{properties_file} - echo \"{KAFKA_CONTROLLER_QUORUM_BOOTSTRAP_SERVERS}={bootstrap_servers}\" >> /tmp/{properties_file} - echo \"{KAFKA_LISTENERS}={listeners}\" >> /tmp/{properties_file} - echo \"{KAFKA_ADVERTISED_LISTENERS}={advertised_listeners}\" >> /tmp/{properties_file} - echo \"{KAFKA_LISTENER_SECURITY_PROTOCOL_MAP}={listener_security_protocol_map}\" >> /tmp/{properties_file} - echo \"{KAFKA_CONTROLLER_QUORUM_VOTERS}={controller_quorum_voters}\" >> /tmp/{properties_file} + cp {config_dir}/jaas.properties /tmp/jaas.properties + config-utils template /tmp/jaas.properties bin/kafka-storage.sh format --cluster-id {cluster_id} --config /tmp/{properties_file} --ignore-formatted {initial_controller_command} - bin/kafka-server-start.sh /tmp/{properties_file} {opa_config}{jaas_config} & + bin/kafka-server-start.sh /tmp/{properties_file} & ", config_dir = STACKABLE_CONFIG_DIR, properties_file = BROKER_PROPERTIES_FILE, - bootstrap_servers = to_bootstrap_servers(&controller_descriptors, client_port), - listeners = kafka_listeners.listeners(), - advertised_listeners = kafka_listeners.advertised_listeners(), - listener_security_protocol_map = kafka_listeners.listener_security_protocol_map(), - controller_quorum_voters = to_quorum_voters(&controller_descriptors, client_port), - initial_controller_command = initial_controllers_command(&controller_descriptors, product_version, client_port), + initial_controller_command = initial_controllers_command(&controller_descriptors, product_version), } } else { formatdoc! {" - bin/kafka-server-start.sh {config_dir}/{properties_file} \ - --override \"zookeeper.connect=$ZOOKEEPER\" \ - --override \"{KAFKA_LISTENERS}={listeners}\" \ - --override \"{KAFKA_ADVERTISED_LISTENERS}={advertised_listeners}\" \ - --override \"{KAFKA_LISTENER_SECURITY_PROTOCOL_MAP}={listener_security_protocol_map}\" \ - {opa_config} \ - {jaas_config} \ - &", + cp {config_dir}/{properties_file} /tmp/{properties_file} + config-utils template /tmp/{properties_file} + + cp {config_dir}/jaas.properties /tmp/jaas.properties + config-utils template /tmp/jaas.properties + + bin/kafka-server-start.sh /tmp/{properties_file} &", config_dir = STACKABLE_CONFIG_DIR, properties_file = BROKER_PROPERTIES_FILE, - listeners = kafka_listeners.listeners(), - advertised_listeners = kafka_listeners.advertised_listeners(), - listener_security_protocol_map = kafka_listeners.listener_security_protocol_map(), } } } @@ -182,31 +130,20 @@ wait_for_termination() pub fn controller_kafka_container_command( cluster_id: &str, controller_descriptors: Vec, - kafka_listeners: &KafkaListenerConfig, - kafka_security: &KafkaTlsSecurity, product_version: &str, ) -> String { - let client_port = kafka_security.client_port(); - - // TODO: The properties file from the configmap is copied to the /tmp folder and appended with dynamic properties - // This should be improved: - // - mount emptyDir as readWriteConfig - // - use config-utils for proper replacements? - // - should we print the adapted properties file at startup? formatdoc! {" {BASH_TRAP_FUNCTIONS} {remove_vector_shutdown_file_command} prepare_signal_handlers containerdebug --output={STACKABLE_LOG_DIR}/containerdebug-state.json --loop & - export REPLICA_ID=$(echo \"$POD_NAME\" | grep -oE '[0-9]+$') + POD_INDEX=$(echo \"$POD_NAME\" | grep -oE '[0-9]+$') + export REPLICA_ID=$((POD_INDEX+NODE_ID_OFFSET)) + cp {config_dir}/{properties_file} /tmp/{properties_file} - echo \"{KAFKA_NODE_ID}=$((REPLICA_ID + ${KAFKA_NODE_ID_OFFSET}))\" >> /tmp/{properties_file} - echo \"{KAFKA_CONTROLLER_QUORUM_BOOTSTRAP_SERVERS}={bootstrap_servers}\" >> /tmp/{properties_file} - echo \"{KAFKA_LISTENERS}={listeners}\" >> /tmp/{properties_file} - echo \"{KAFKA_LISTENER_SECURITY_PROTOCOL_MAP}={listener_security_protocol_map}\" >> /tmp/{properties_file} - echo \"{KAFKA_CONTROLLER_QUORUM_VOTERS}={controller_quorum_voters}\" >> /tmp/{properties_file} + config-utils template /tmp/{properties_file} bin/kafka-storage.sh format --cluster-id {cluster_id} --config /tmp/{properties_file} --ignore-formatted {initial_controller_command} bin/kafka-server-start.sh /tmp/{properties_file} & @@ -217,50 +154,15 @@ pub fn controller_kafka_container_command( remove_vector_shutdown_file_command = remove_vector_shutdown_file_command(STACKABLE_LOG_DIR), config_dir = STACKABLE_CONFIG_DIR, properties_file = CONTROLLER_PROPERTIES_FILE, - bootstrap_servers = to_bootstrap_servers(&controller_descriptors, client_port), - listeners = to_listeners(client_port), - listener_security_protocol_map = to_listener_security_protocol_map(kafka_listeners), - initial_controller_command = initial_controllers_command(&controller_descriptors, product_version, client_port), - controller_quorum_voters = to_quorum_voters(&controller_descriptors, client_port), + initial_controller_command = initial_controllers_command(&controller_descriptors, product_version), create_vector_shutdown_file_command = create_vector_shutdown_file_command(STACKABLE_LOG_DIR) } } -fn to_listeners(port: u16) -> String { - // The environment variables are set in the statefulset of the controller - format!( - "{listener_name}://$POD_NAME.$ROLEGROUP_HEADLESS_SERVICE_NAME.$NAMESPACE.svc.$CLUSTER_DOMAIN:{port}", - listener_name = KafkaListenerName::Controller - ) -} - -fn to_listener_security_protocol_map(kafka_listeners: &KafkaListenerConfig) -> String { - kafka_listeners - .listener_security_protocol_map_for_listener(&KafkaListenerName::Controller) - .unwrap_or("".to_string()) -} - -fn to_initial_controllers(controller_descriptors: &[KafkaPodDescriptor], port: u16) -> String { - controller_descriptors - .iter() - .map(|desc| desc.as_voter(port)) - .collect::>() - .join(",") -} - -// TODO: This can be removed once 3.7.2 is removed. Used in command.rs. -fn to_quorum_voters(controller_descriptors: &[KafkaPodDescriptor], port: u16) -> String { - controller_descriptors - .iter() - .map(|desc| desc.as_quorum_voter(port)) - .collect::>() - .join(",") -} - -fn to_bootstrap_servers(controller_descriptors: &[KafkaPodDescriptor], port: u16) -> String { +fn to_initial_controllers(controller_descriptors: &[KafkaPodDescriptor]) -> String { controller_descriptors .iter() - .map(|desc| format!("{fqdn}:{port}", fqdn = desc.fqdn())) + .map(|desc| desc.as_voter()) .collect::>() .join(",") } @@ -268,13 +170,12 @@ fn to_bootstrap_servers(controller_descriptors: &[KafkaPodDescriptor], port: u16 fn initial_controllers_command( controller_descriptors: &[KafkaPodDescriptor], product_version: &str, - client_port: u16, ) -> String { match product_version.starts_with("3.7") { true => "".to_string(), false => format!( "--initial-controllers {initial_controllers}", - initial_controllers = to_initial_controllers(controller_descriptors, client_port), + initial_controllers = to_initial_controllers(controller_descriptors), ), } } diff --git a/rust/operator-binary/src/config/node_id_hasher.rs b/rust/operator-binary/src/config/node_id_hasher.rs index eebee090..51690a0e 100644 --- a/rust/operator-binary/src/config/node_id_hasher.rs +++ b/rust/operator-binary/src/config/node_id_hasher.rs @@ -2,6 +2,12 @@ use stackable_operator::role_utils::RoleGroupRef; use crate::crd::v1alpha1::KafkaCluster; +/// The Kafka node.id needs to be unique across the Kafka cluster. +/// This function generates an integer that is stable for a given role group +/// regardless if broker or controllers. +/// This integer is then added to the pod index to compute the final node.id +/// The node.id is only set and used in Kraft mode. +/// Warning: this is not safe from collisions. pub fn node_id_hash32_offset(rolegroup_ref: &RoleGroupRef) -> u32 { let hash = fnv_hash32(&format!( "{role}-{rolegroup}", diff --git a/rust/operator-binary/src/crd/listener.rs b/rust/operator-binary/src/crd/listener.rs index ed2b2100..12b35d62 100644 --- a/rust/operator-binary/src/crd/listener.rs +++ b/rust/operator-binary/src/crd/listener.rs @@ -99,13 +99,6 @@ impl KafkaListenerName { listener_name = self.to_string().to_lowercase() ) } - - pub fn listener_gssapi_sasl_jaas_config(&self) -> String { - format!( - "listener.name.{listener_name}.gssapi.sasl.jaas.config", - listener_name = self.to_string().to_lowercase() - ) - } } #[derive(Debug)] @@ -330,21 +323,29 @@ pub fn get_kafka_listener_config( }) } -pub fn node_address_cmd(directory: &str) -> String { +pub fn node_address_cmd_env(directory: &str) -> String { format!("$(cat {directory}/default-address/address)") } -pub fn node_port_cmd(directory: &str, port_name: &str) -> String { +pub fn node_port_cmd_env(directory: &str, port_name: &str) -> String { format!("$(cat {directory}/default-address/ports/{port_name})") } +pub fn node_address_cmd(directory: &str) -> String { + format!("${{file:UTF-8:{directory}/default-address/address}}") +} + +pub fn node_port_cmd(directory: &str, port_name: &str) -> String { + format!("${{file:UTF-8:{directory}/default-address/ports/{port_name}}}") +} + pub fn pod_fqdn( kafka: &v1alpha1::KafkaCluster, sts_service_name: &str, cluster_info: &KubernetesClusterInfo, ) -> Result { Ok(format!( - "$POD_NAME.{sts_service_name}.{namespace}.svc.{cluster_domain}", + "${{env:POD_NAME}}.{sts_service_name}.{namespace}.svc.{cluster_domain}", namespace = kafka.namespace().context(ObjectHasNoNamespaceSnafu)?, cluster_domain = cluster_info.cluster_domain )) diff --git a/rust/operator-binary/src/crd/mod.rs b/rust/operator-binary/src/crd/mod.rs index 9748388c..e8ea4852 100644 --- a/rust/operator-binary/src/crd/mod.rs +++ b/rust/operator-binary/src/crd/mod.rs @@ -254,14 +254,15 @@ impl v1alpha1::KafkaCluster { }) } - /// List all pod descriptors of a provided role expected to form the cluster. - /// + /// List pod descriptors for a given role and all its rolegroups. + /// If no role is provided, pod descriptors for all roles (and all groups) are listed. /// We try to predict the pods here rather than looking at the current cluster state in order to /// avoid instance churn. pub fn pod_descriptors( &self, - requested_kafka_role: &KafkaRole, + requested_kafka_role: Option<&KafkaRole>, cluster_info: &KubernetesClusterInfo, + client_port: u16, ) -> Result, Error> { let namespace = self.metadata.namespace.clone().context(NoNamespaceSnafu)?; let mut pod_descriptors = Vec::new(); @@ -289,17 +290,19 @@ impl v1alpha1::KafkaCluster { } }; - // only return descriptors for selected role - if current_role == *requested_kafka_role { + // If no specific role is requested, or the current role matches the requested one, add pod descriptors + if requested_kafka_role.is_none() || Some(¤t_role) == requested_kafka_role { for replica in 0..replicas { pod_descriptors.push(KafkaPodDescriptor { namespace: namespace.clone(), + role: current_role.to_string(), role_group_service_name: rolegroup_ref .rolegroup_headless_service_name(), role_group_statefulset_name: rolegroup_ref.object_name(), replica, cluster_domain: cluster_info.cluster_domain.clone(), node_id: node_id_hash_offset + u32::from(replica), + client_port, }); } } @@ -348,6 +351,8 @@ pub struct KafkaPodDescriptor { replica: u16, cluster_domain: DomainName, node_id: u32, + pub role: String, + pub client_port: u16, } impl KafkaPodDescriptor { @@ -376,18 +381,20 @@ impl KafkaPodDescriptor { /// * controller-0 is the replica's host, /// * 1234 is the replica's port. // NOTE(@maltesander): Even though the used Uuid states to be type 4 it does not work... 0000000000-00000000000 works... - pub fn as_voter(&self, port: u16) -> String { + pub fn as_voter(&self) -> String { format!( "{node_id}@{fqdn}:{port}:0000000000-{node_id:0>11}", node_id = self.node_id, + port = self.client_port, fqdn = self.fqdn(), ) } - pub fn as_quorum_voter(&self, port: u16) -> String { + pub fn as_quorum_voter(&self) -> String { format!( "{node_id}@{fqdn}:{port}", node_id = self.node_id, + port = self.client_port, fqdn = self.fqdn(), ) } diff --git a/rust/operator-binary/src/crd/role/broker.rs b/rust/operator-binary/src/crd/role/broker.rs index 837ddd80..00d614b9 100644 --- a/rust/operator-binary/src/crd/role/broker.rs +++ b/rust/operator-binary/src/crd/role/broker.rs @@ -15,11 +15,7 @@ use stackable_operator::{ use strum::{Display, EnumIter}; use crate::crd::{ - listener::KafkaListenerName, - role::{ - KAFKA_LOG_DIRS, KAFKA_PROCESS_ROLES, KafkaRole, - commons::{CommonConfig, Storage, StorageFragment}, - }, + role::commons::{CommonConfig, Storage, StorageFragment}, v1alpha1, }; @@ -129,44 +125,11 @@ impl Configuration for BrokerConfigFragment { fn compute_files( &self, - resource: &Self::Configurable, + _resource: &Self::Configurable, _role_name: &str, - file: &str, + _file: &str, ) -> Result>, stackable_operator::product_config_utils::Error> { - let mut config = BTreeMap::new(); - - if file == BROKER_PROPERTIES_FILE { - config.insert( - KAFKA_LOG_DIRS.to_string(), - Some("/stackable/data/topicdata".to_string()), - ); - - // KRAFT - if resource.is_controller_configured() { - config.insert( - KAFKA_PROCESS_ROLES.to_string(), - Some(KafkaRole::Broker.to_string()), - ); - - config.insert( - "controller.listener.names".to_string(), - Some(KafkaListenerName::Controller.to_string()), - ); - } - // OPA - if resource.spec.cluster_config.authorization.opa.is_some() { - config.insert( - "authorizer.class.name".to_string(), - Some("org.openpolicyagent.kafka.OpaAuthorizer".to_string()), - ); - config.insert( - "opa.authorizer.metrics.enabled".to_string(), - Some("true".to_string()), - ); - } - } - - Ok(config) + Ok(BTreeMap::new()) } } diff --git a/rust/operator-binary/src/crd/role/controller.rs b/rust/operator-binary/src/crd/role/controller.rs index 9be5464f..5b9513a5 100644 --- a/rust/operator-binary/src/crd/role/controller.rs +++ b/rust/operator-binary/src/crd/role/controller.rs @@ -15,11 +15,7 @@ use stackable_operator::{ use strum::{Display, EnumIter}; use crate::crd::{ - listener::KafkaListenerName, - role::{ - KAFKA_LOG_DIRS, KAFKA_PROCESS_ROLES, KafkaRole, - commons::{CommonConfig, Storage, StorageFragment}, - }, + role::commons::{CommonConfig, Storage, StorageFragment}, v1alpha1, }; @@ -121,29 +117,9 @@ impl Configuration for ControllerConfigFragment { &self, _resource: &Self::Configurable, _role_name: &str, - file: &str, + _file: &str, ) -> Result>, stackable_operator::product_config_utils::Error> { - let mut config = BTreeMap::new(); - - if file == CONTROLLER_PROPERTIES_FILE { - config.insert( - KAFKA_LOG_DIRS.to_string(), - Some("/stackable/data/kraft".to_string()), - ); - - // KRAFT - config.insert( - KAFKA_PROCESS_ROLES.to_string(), - Some(KafkaRole::Controller.to_string()), - ); - - config.insert( - "controller.listener.names".to_string(), - Some(KafkaListenerName::Controller.to_string()), - ); - } - - Ok(config) + Ok(BTreeMap::new()) } } diff --git a/rust/operator-binary/src/crd/security.rs b/rust/operator-binary/src/crd/security.rs index 6c482eb8..325f95fc 100644 --- a/rust/operator-binary/src/crd/security.rs +++ b/rust/operator-binary/src/crd/security.rs @@ -22,12 +22,12 @@ use stackable_operator::{ shared::time::Duration, }; -use super::listener::{KafkaListenerProtocol, node_port_cmd}; +use super::listener::KafkaListenerProtocol; use crate::crd::{ LISTENER_BOOTSTRAP_VOLUME_NAME, LISTENER_BROKER_VOLUME_NAME, STACKABLE_KERBEROS_KRB5_PATH, STACKABLE_LISTENER_BROKER_DIR, authentication::{self, ResolvedAuthenticationClasses}, - listener::{self, KafkaListenerName, node_address_cmd}, + listener::{self, KafkaListenerName, node_address_cmd_env, node_port_cmd_env}, role::KafkaRole, tls, v1alpha1, }; @@ -247,7 +247,6 @@ impl KafkaTlsSecurity { args.push("-L".to_string()); } else if self.has_kerberos_enabled() { let service_name = KafkaRole::Broker.kerberos_service_name(); - let broker_port = node_port_cmd(STACKABLE_LISTENER_BROKER_DIR, self.client_port_name()); // here we need to specify a shell so that variable substitution will work // see e.g. https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/V1ExecAction.md args.push("/bin/bash".to_string()); @@ -269,13 +268,20 @@ impl KafkaTlsSecurity { bash_args.push( format!( "export POD_BROKER_LISTENER_ADDRESS={};", - node_address_cmd(STACKABLE_LISTENER_BROKER_DIR) + node_address_cmd_env(STACKABLE_LISTENER_BROKER_DIR) + ) + .to_string(), + ); + bash_args.push( + format!( + "export POD_BROKER_LISTENER_PORT={};", + node_port_cmd_env(STACKABLE_LISTENER_BROKER_DIR, self.client_port_name()) ) .to_string(), ); bash_args.push("/stackable/kcat".to_string()); bash_args.push("-b".to_string()); - bash_args.push(format!("$POD_BROKER_LISTENER_ADDRESS:{broker_port}")); + bash_args.push("$POD_BROKER_LISTENER_ADDRESS:$POD_BROKER_LISTENER_PORT".to_string()); bash_args.extend(Self::kcat_client_sasl_ssl( Self::STACKABLE_TLS_KCAT_DIR, service_name, diff --git a/rust/operator-binary/src/kafka_controller.rs b/rust/operator-binary/src/kafka_controller.rs index fbce9c2a..aebe52c3 100644 --- a/rust/operator-binary/src/kafka_controller.rs +++ b/rust/operator-binary/src/kafka_controller.rs @@ -37,6 +37,7 @@ use crate::{ crd::{ self, APP_NAME, DOCKER_IMAGE_BASE_NAME, JVM_SECURITY_PROPERTIES_FILE, KafkaClusterStatus, OPERATOR_NAME, + listener::get_kafka_listener_config, role::{ AnyConfig, KafkaRole, broker::BROKER_PROPERTIES_FILE, controller::CONTROLLER_PROPERTIES_FILE, @@ -66,6 +67,14 @@ pub struct Ctx { #[strum_discriminants(derive(IntoStaticStr))] #[allow(clippy::enum_variant_names)] pub enum Error { + #[snafu(display("failed to build pod descriptors"))] + BuildPodDescriptors { source: crate::crd::Error }, + + #[snafu(display("invalid kafka listeners"))] + InvalidKafkaListeners { + source: crate::crd::listener::KafkaListenerError, + }, + #[snafu(display("cluster object defines no '{role}' role"))] MissingKafkaRole { source: crate::crd::Error, @@ -242,6 +251,8 @@ impl ReconcilerError for Error { Error::BuildConfigMap { .. } => None, Error::BuildService { .. } => None, Error::BuildListener { .. } => None, + Error::InvalidKafkaListeners { .. } => None, + Error::BuildPodDescriptors { .. } => None, } } } @@ -359,6 +370,22 @@ pub async fn reconcile_kafka( build_rolegroup_metrics_service(kafka, &resolved_product_image, &rolegroup_ref) .context(BuildServiceSnafu)?; + let kafka_listeners = get_kafka_listener_config( + kafka, + &kafka_security, + &rolegroup_ref, + &client.kubernetes_cluster_info, + ) + .context(InvalidKafkaListenersSnafu)?; + + let pod_descriptors = kafka + .pod_descriptors( + None, + &client.kubernetes_cluster_info, + kafka_security.client_port(), + ) + .context(BuildPodDescriptorsSnafu)?; + let rg_configmap = build_rolegroup_config_map( kafka, &resolved_product_image, @@ -366,6 +393,9 @@ pub async fn reconcile_kafka( &rolegroup_ref, rolegroup_config, &merged_config, + &kafka_listeners, + &pod_descriptors, + opa_connect.as_deref(), ) .context(BuildConfigMapSnafu)?; @@ -376,7 +406,6 @@ pub async fn reconcile_kafka( &resolved_product_image, &rolegroup_ref, rolegroup_config, - opa_connect.as_deref(), &kafka_security, &merged_config, &rbac_sa, diff --git a/rust/operator-binary/src/kerberos.rs b/rust/operator-binary/src/kerberos.rs index e22de94a..dbde82be 100644 --- a/rust/operator-binary/src/kerberos.rs +++ b/rust/operator-binary/src/kerberos.rs @@ -61,7 +61,7 @@ pub fn add_kerberos_pod_config( cb.add_env_var("KRB5_CONFIG", STACKABLE_KERBEROS_KRB5_PATH); cb.add_env_var( "KAFKA_OPTS", - format!("-Djava.security.krb5.conf={STACKABLE_KERBEROS_KRB5_PATH}",), + format!("-Djava.security.auth.login.config=/tmp/jaas.properties -Djava.security.krb5.conf={STACKABLE_KERBEROS_KRB5_PATH}",), ); } } diff --git a/rust/operator-binary/src/resource/configmap.rs b/rust/operator-binary/src/resource/configmap.rs index 76368fef..ded83c59 100644 --- a/rust/operator-binary/src/resource/configmap.rs +++ b/rust/operator-binary/src/resource/configmap.rs @@ -1,7 +1,11 @@ -use std::collections::{BTreeMap, HashMap}; +use std::{ + collections::{BTreeMap, HashMap}, + str::FromStr, +}; +use indoc::formatdoc; use product_config::{types::PropertyNameKind, writer::to_java_properties_string}; -use snafu::{ResultExt, Snafu}; +use snafu::{OptionExt, ResultExt, Snafu}; use stackable_operator::{ builder::{configmap::ConfigMapBuilder, meta::ObjectMetaBuilder}, commons::product_image_selection::ResolvedProductImage, @@ -10,7 +14,18 @@ use stackable_operator::{ }; use crate::{ - crd::{JVM_SECURITY_PROPERTIES_FILE, role::AnyConfig, security::KafkaTlsSecurity, v1alpha1}, + crd::{ + JVM_SECURITY_PROPERTIES_FILE, KafkaPodDescriptor, STACKABLE_LISTENER_BOOTSTRAP_DIR, + STACKABLE_LISTENER_BROKER_DIR, + listener::{KafkaListenerConfig, KafkaListenerName, node_address_cmd}, + role::{ + AnyConfig, KAFKA_ADVERTISED_LISTENERS, KAFKA_CONTROLLER_QUORUM_BOOTSTRAP_SERVERS, + KAFKA_CONTROLLER_QUORUM_VOTERS, KAFKA_LISTENER_SECURITY_PROTOCOL_MAP, KAFKA_LISTENERS, + KAFKA_LOG_DIRS, KAFKA_NODE_ID, KAFKA_PROCESS_ROLES, KafkaRole, + }, + security::KafkaTlsSecurity, + v1alpha1, + }, kafka_controller::KAFKA_CONTROLLER_NAME, operations::graceful_shutdown::graceful_shutdown_config_properties, product_logging::extend_role_group_config_map, @@ -49,9 +64,22 @@ pub enum Error { source: product_config::writer::PropertiesWriterError, rolegroup: RoleGroupRef, }, + + #[snafu(display("no Kraft controllers found to build"))] + NoKraftControllersFound, + + #[snafu(display("unknown Kafka role [{name}]"))] + UnknownKafkaRole { + source: strum::ParseError, + name: String, + }, + + #[snafu(display("failed to build jaas configuration file for {rolegroup}"))] + BuildJaasConfig { rolegroup: String }, } /// The rolegroup [`ConfigMap`] configures the rolegroup based on the configuration given by the administrator +#[allow(clippy::too_many_arguments)] pub fn build_rolegroup_config_map( kafka: &v1alpha1::KafkaCluster, resolved_product_image: &ResolvedProductImage, @@ -59,13 +87,20 @@ pub fn build_rolegroup_config_map( rolegroup: &RoleGroupRef, rolegroup_config: &HashMap>, merged_config: &AnyConfig, + listener_config: &KafkaListenerConfig, + pod_descriptors: &[KafkaPodDescriptor], + opa_connect_string: Option<&str>, ) -> Result { let kafka_config_file_name = merged_config.config_file_name(); - let mut kafka_config = rolegroup_config - .get(&PropertyNameKind::File(kafka_config_file_name.to_string())) - .cloned() - .unwrap_or_default(); + let mut kafka_config = server_properties_file( + kafka.is_controller_configured(), + &rolegroup.role, + pod_descriptors, + listener_config, + opa_connect_string, + resolved_product_image.product_version.starts_with("3.7"), // needs_quorum_voters + )?; match merged_config { AnyConfig::Broker(_) => kafka_config.extend(kafka_security.broker_config_settings()), @@ -76,6 +111,14 @@ pub fn build_rolegroup_config_map( kafka_config.extend(graceful_shutdown_config_properties()); + // Need to call this to get configOverrides :( + kafka_config.extend( + rolegroup_config + .get(&PropertyNameKind::File(kafka_config_file_name.to_string())) + .cloned() + .unwrap_or_default(), + ); + let kafka_config = kafka_config .into_iter() .map(|(k, v)| (k, Some(v))) @@ -136,6 +179,14 @@ pub fn build_rolegroup_config_map( .with_context(|_| JvmSecurityPopertiesSnafu { rolegroup: rolegroup.role_group.clone(), })?, + ) + // This file contains the JAAS configuration for Kerberos authentication + // It has the ".properties" extension but is not a Java properties file. + // It is processed by `config-utils` to substitute "env:" and "file:" variables + // and this tool currently doesn't support the JAAS login configuration format. + .add_data( + "jaas.properties", + jaas_config_file(kafka_security.has_kerberos_enabled()), ); tracing::debug!(?kafka_config, "Applied kafka config"); @@ -154,3 +205,202 @@ pub fn build_rolegroup_config_map( rolegroup: rolegroup.clone(), }) } + +// Generate the content of both broker.properties and controller.properties files. +fn server_properties_file( + kraft_mode: bool, + role: &str, + pod_descriptors: &[KafkaPodDescriptor], + listener_config: &KafkaListenerConfig, + opa_connect_string: Option<&str>, + needs_quorum_voters: bool, +) -> Result, Error> { + let kraft_controllers = kraft_controllers(pod_descriptors); + + let role = KafkaRole::from_str(role).context(UnknownKafkaRoleSnafu { + name: role.to_string(), + })?; + + match role { + KafkaRole::Controller => { + let kraft_controllers = kraft_controllers.context(NoKraftControllersFoundSnafu)?; + + let mut result = BTreeMap::from([ + ( + KAFKA_LOG_DIRS.to_string(), + "/stackable/data/kraft".to_string(), + ), + (KAFKA_PROCESS_ROLES.to_string(), role.to_string()), + ( + "controller.listener.names".to_string(), + KafkaListenerName::Controller.to_string(), + ), + ( + KAFKA_NODE_ID.to_string(), + "${env:REPLICA_ID}".to_string(), + ), + ( + KAFKA_CONTROLLER_QUORUM_BOOTSTRAP_SERVERS.to_string(), + kraft_controllers.clone(), + ), + ( + KAFKA_LISTENERS.to_string(), + "CONTROLLER://${env:POD_NAME}.${env:ROLEGROUP_HEADLESS_SERVICE_NAME}.${env:NAMESPACE}.svc.${env:CLUSTER_DOMAIN}:${env:KAFKA_CLIENT_PORT}".to_string(), + ), + ( + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP.to_string(), + listener_config + .listener_security_protocol_map_for_listener(&KafkaListenerName::Controller) + .unwrap_or("".to_string())), + ]); + + if needs_quorum_voters { + let kraft_voters = + kraft_voters(pod_descriptors).context(NoKraftControllersFoundSnafu)?; + + result.extend([(KAFKA_CONTROLLER_QUORUM_VOTERS.to_string(), kraft_voters)]); + } + + Ok(result) + } + KafkaRole::Broker => { + let mut result = BTreeMap::from([ + ( + KAFKA_LOG_DIRS.to_string(), + "/stackable/data/topicdata".to_string(), + ), + (KAFKA_LISTENERS.to_string(), listener_config.listeners()), + ( + KAFKA_ADVERTISED_LISTENERS.to_string(), + listener_config.advertised_listeners(), + ), + ( + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP.to_string(), + listener_config.listener_security_protocol_map(), + ), + ]); + + if kraft_mode { + let kraft_controllers = kraft_controllers.context(NoKraftControllersFoundSnafu)?; + + // Running in KRaft mode + result.extend([ + (KAFKA_NODE_ID.to_string(), "${env:REPLICA_ID}".to_string()), + ( + KAFKA_PROCESS_ROLES.to_string(), + KafkaRole::Broker.to_string(), + ), + ( + "controller.listener.names".to_string(), + KafkaListenerName::Controller.to_string(), + ), + ( + KAFKA_CONTROLLER_QUORUM_BOOTSTRAP_SERVERS.to_string(), + kraft_controllers.clone(), + ), + ]); + + if needs_quorum_voters { + let kraft_voters = + kraft_voters(pod_descriptors).context(NoKraftControllersFoundSnafu)?; + + result.extend([(KAFKA_CONTROLLER_QUORUM_VOTERS.to_string(), kraft_voters)]); + } + } else { + // Running with ZooKeeper enabled + result.extend([( + "zookeeper.connect".to_string(), + "${env:ZOOKEEPER}".to_string(), + )]); + } + + // Enable OPA authorization + if opa_connect_string.is_some() { + result.extend([ + ( + "authorizer.class.name".to_string(), + "org.openpolicyagent.kafka.OpaAuthorizer".to_string(), + ), + ( + "opa.authorizer.metrics.enabled".to_string(), + "true".to_string(), + ), + ( + "opa.authorizer.url".to_string(), + opa_connect_string.unwrap_or_default().to_string(), + ), + ]); + } + + Ok(result) + } + } +} + +fn kraft_controllers(pod_descriptors: &[KafkaPodDescriptor]) -> Option { + let result = pod_descriptors + .iter() + .filter(|pd| pd.role == KafkaRole::Controller.to_string()) + .map(|desc| { + format!( + "{fqdn}:{client_port}", + fqdn = desc.fqdn(), + client_port = desc.client_port + ) + }) + .collect::>() + .join(","); + + if result.is_empty() { + None + } else { + Some(result) + } +} + +fn kraft_voters(pod_descriptors: &[KafkaPodDescriptor]) -> Option { + let result = pod_descriptors + .iter() + .filter(|pd| pd.role == KafkaRole::Controller.to_string()) + .map(|desc| desc.as_quorum_voter()) + .collect::>() + .join(","); + + if result.is_empty() { + None + } else { + Some(result) + } +} + +// Generate JAAS configuration file for Kerberos authentication +// or an empty string if Kerberos is not enabled. +// See https://docs.oracle.com/javase/8/docs/technotes/guides/security/jgss/tutorials/LoginConfigFile.html +fn jaas_config_file(is_kerberos_enabled: bool) -> String { + match is_kerberos_enabled { + false => String::new(), + true => formatdoc! {" + bootstrap.KafkaServer {{ + com.sun.security.auth.module.Krb5LoginModule required + useKeyTab=true + storeKey=true + isInitiator=false + keyTab=\"/stackable/kerberos/keytab\" + principal=\"kafka/{bootstrap_address}@${{env:KERBEROS_REALM}}\"; + }}; + + client.KafkaServer {{ + com.sun.security.auth.module.Krb5LoginModule required + useKeyTab=true + storeKey=true + isInitiator=false + keyTab=\"/stackable/kerberos/keytab\" + principal=\"kafka/{broker_address}@${{env:KERBEROS_REALM}}\"; + }}; + + ", + bootstrap_address = node_address_cmd(STACKABLE_LISTENER_BOOTSTRAP_DIR), + broker_address = node_address_cmd(STACKABLE_LISTENER_BROKER_DIR), + }, + } +} diff --git a/rust/operator-binary/src/resource/statefulset.rs b/rust/operator-binary/src/resource/statefulset.rs index 67343518..89154dad 100644 --- a/rust/operator-binary/src/resource/statefulset.rs +++ b/rust/operator-binary/src/resource/statefulset.rs @@ -52,7 +52,6 @@ use crate::{ LISTENER_BROKER_VOLUME_NAME, LOG_DIRS_VOLUME_NAME, METRICS_PORT, METRICS_PORT_NAME, STACKABLE_CONFIG_DIR, STACKABLE_DATA_DIR, STACKABLE_LISTENER_BOOTSTRAP_DIR, STACKABLE_LISTENER_BROKER_DIR, - listener::get_kafka_listener_config, role::{ AnyConfig, KAFKA_NODE_ID_OFFSET, KafkaRole, broker::BrokerContainer, controller::ControllerContainer, @@ -169,7 +168,6 @@ pub fn build_broker_rolegroup_statefulset( resolved_product_image: &ResolvedProductImage, rolegroup_ref: &RoleGroupRef, broker_config: &HashMap>, - opa_connect_string: Option<&str>, kafka_security: &KafkaTlsSecurity, merged_config: &AnyConfig, service_account: &ServiceAccount, @@ -285,10 +283,6 @@ pub fn build_broker_rolegroup_statefulset( ..EnvVar::default() }); - let kafka_listeners = - get_kafka_listener_config(kafka, kafka_security, rolegroup_ref, cluster_info) - .context(InvalidKafkaListenersSnafu)?; - let cluster_id = kafka.cluster_id().context(ClusterIdMissingSnafu)?; cb_kafka @@ -305,10 +299,12 @@ pub fn build_broker_rolegroup_statefulset( cluster_id, // we need controller pods kafka - .pod_descriptors(&KafkaRole::Controller, cluster_info) + .pod_descriptors( + Some(&KafkaRole::Controller), + cluster_info, + kafka_security.client_port(), + ) .context(BuildPodDescriptorsSnafu)?, - &kafka_listeners, - opa_connect_string, kafka_security, &resolved_product_image.product_version, )]) @@ -337,6 +333,10 @@ pub fn build_broker_rolegroup_statefulset( KAFKA_NODE_ID_OFFSET, node_id_hash32_offset(rolegroup_ref).to_string(), ) + .add_env_var( + "KAFKA_CLIENT_PORT".to_string(), + kafka_security.client_port().to_string(), + ) .add_env_vars(env) .add_container_ports(container_ports(kafka_security)) .add_volume_mount(LOG_DIRS_VOLUME_NAME, STACKABLE_DATA_DIR) @@ -628,9 +628,11 @@ pub fn build_controller_rolegroup_statefulset( ..EnvVar::default() }); - let kafka_listeners = - get_kafka_listener_config(kafka, kafka_security, rolegroup_ref, cluster_info) - .context(InvalidKafkaListenersSnafu)?; + env.push(EnvVar { + name: "KAFKA_CLIENT_PORT".to_string(), + value: Some(kafka_security.client_port().to_string()), + ..EnvVar::default() + }); cb_kafka .image_from_product_image(resolved_product_image) @@ -644,10 +646,8 @@ pub fn build_controller_rolegroup_statefulset( .args(vec![controller_kafka_container_command( kafka.cluster_id().context(ClusterIdMissingSnafu)?, kafka - .pod_descriptors(kafka_role, cluster_info) + .pod_descriptors(Some(kafka_role), cluster_info, kafka_security.client_port()) .context(BuildPodDescriptorsSnafu)?, - &kafka_listeners, - kafka_security, &resolved_product_image.product_version, )]) .add_env_var("PRE_STOP_CONTROLLER_SLEEP_SECONDS", "10")