From 836006820ee5f0b27771f75aa2daccc0eea5c9ee Mon Sep 17 00:00:00 2001 From: Razvan-Daniel Mihai <84674+razvan@users.noreply.github.com> Date: Thu, 13 Nov 2025 16:32:14 +0100 Subject: [PATCH] refactor: only two Kafka listeners CLIENT and INTERNAL --- rust/operator-binary/src/crd/listener.rs | 262 ++++-------------- rust/operator-binary/src/crd/role/mod.rs | 6 +- rust/operator-binary/src/crd/security.rs | 212 +++++--------- .../operator-binary/src/resource/configmap.rs | 4 +- rust/operator-binary/src/resource/service.rs | 2 +- .../src/resource/statefulset.rs | 7 +- 6 files changed, 143 insertions(+), 350 deletions(-) diff --git a/rust/operator-binary/src/crd/listener.rs b/rust/operator-binary/src/crd/listener.rs index 12b35d62..0695dc6a 100644 --- a/rust/operator-binary/src/crd/listener.rs +++ b/rust/operator-binary/src/crd/listener.rs @@ -162,159 +162,55 @@ impl Display for KafkaListener { } } +// Builds two Kafka listeners: CLIENT and INTERNAL. +// The INTERNAL listener binds *and* advertises the corresponding pod of the headless service. +// The CLIENT listener binds to 0.0.0.0 and advertises the corresponding "broker listener" service. +// Every broker in the cluster has its own broker listener service created by the listener operator. +// TODO: This function is tightly coupled with `crd::security::KafkaTlsSecurity::broker_listener_tls_properties`. pub fn get_kafka_listener_config( kafka: &v1alpha1::KafkaCluster, kafka_security: &KafkaTlsSecurity, - rolegroup_ref: &RoleGroupRef, + _rolegroup_ref: &RoleGroupRef, cluster_info: &KubernetesClusterInfo, ) -> Result { - let pod_fqdn = pod_fqdn( - kafka, - &rolegroup_ref.rolegroup_headless_service_name(), - cluster_info, - )?; + let pod_fqdn = internal_pod_fqdn(kafka, cluster_info)?; let mut listeners = vec![]; let mut advertised_listeners = vec![]; let mut listener_security_protocol_map: BTreeMap = BTreeMap::new(); - // CLIENT - if kafka_security.tls_client_authentication_class().is_some() { - // 1) If client authentication required, we expose only CLIENT_AUTH connection with SSL - listeners.push(KafkaListener { - name: KafkaListenerName::ClientAuth, - host: LISTENER_LOCAL_ADDRESS.to_string(), - port: kafka_security.client_port().to_string(), - }); - advertised_listeners.push(KafkaListener { - name: KafkaListenerName::ClientAuth, - host: node_address_cmd(STACKABLE_LISTENER_BROKER_DIR), - port: node_port_cmd( - STACKABLE_LISTENER_BROKER_DIR, - kafka_security.client_port_name(), - ), - }); - listener_security_protocol_map - .insert(KafkaListenerName::ClientAuth, KafkaListenerProtocol::Ssl); - listener_security_protocol_map.insert( - KafkaListenerName::ControllerAuth, - KafkaListenerProtocol::Ssl, - ); - } else if kafka_security.has_kerberos_enabled() { - // 2) Kerberos and TLS authentication classes are mutually exclusive - listeners.push(KafkaListener { - name: KafkaListenerName::Client, - host: LISTENER_LOCAL_ADDRESS.to_string(), - port: KafkaTlsSecurity::SECURE_CLIENT_PORT.to_string(), - }); - advertised_listeners.push(KafkaListener { - name: KafkaListenerName::Client, - host: node_address_cmd(STACKABLE_LISTENER_BROKER_DIR), - port: node_port_cmd( - STACKABLE_LISTENER_BROKER_DIR, - kafka_security.client_port_name(), - ), - }); - listener_security_protocol_map - .insert(KafkaListenerName::Client, KafkaListenerProtocol::SaslSsl); - listener_security_protocol_map.insert( - KafkaListenerName::Controller, - KafkaListenerProtocol::SaslSsl, - ); - } else if kafka_security.tls_server_secret_class().is_some() { - // 3) If no client authentication but tls is required we expose CLIENT with SSL - listeners.push(KafkaListener { - name: KafkaListenerName::Client, - host: LISTENER_LOCAL_ADDRESS.to_string(), - port: kafka_security.client_port().to_string(), - }); - advertised_listeners.push(KafkaListener { - name: KafkaListenerName::Client, - host: node_address_cmd(STACKABLE_LISTENER_BROKER_DIR), - port: node_port_cmd( - STACKABLE_LISTENER_BROKER_DIR, - kafka_security.client_port_name(), - ), - }); - listener_security_protocol_map - .insert(KafkaListenerName::Client, KafkaListenerProtocol::Ssl); - } else { - // 4) If no client auth or tls is required we expose CLIENT with PLAINTEXT - listeners.push(KafkaListener { - name: KafkaListenerName::Client, - host: LISTENER_LOCAL_ADDRESS.to_string(), - port: KafkaTlsSecurity::CLIENT_PORT.to_string(), - }); - advertised_listeners.push(KafkaListener { - name: KafkaListenerName::Client, - host: node_address_cmd(STACKABLE_LISTENER_BROKER_DIR), - port: node_port_cmd( - STACKABLE_LISTENER_BROKER_DIR, - kafka_security.client_port_name(), - ), - }); - listener_security_protocol_map - .insert(KafkaListenerName::Client, KafkaListenerProtocol::Plaintext); - } - - // INTERNAL / CONTROLLER - if kafka_security.has_kerberos_enabled() || kafka_security.tls_internal_secret_class().is_some() - { - // 5) & 6) Kerberos and TLS authentication classes are mutually exclusive but both require internal tls to be used - listeners.push(KafkaListener { - name: KafkaListenerName::Internal, - host: LISTENER_LOCAL_ADDRESS.to_string(), - port: KafkaTlsSecurity::SECURE_INTERNAL_PORT.to_string(), - }); - advertised_listeners.push(KafkaListener { - name: KafkaListenerName::Internal, - host: pod_fqdn.to_string(), - port: KafkaTlsSecurity::SECURE_INTERNAL_PORT.to_string(), - }); - listener_security_protocol_map - .insert(KafkaListenerName::Internal, KafkaListenerProtocol::Ssl); - listener_security_protocol_map - .insert(KafkaListenerName::Controller, KafkaListenerProtocol::Ssl); - } else { - // 7) If no internal tls is required we expose INTERNAL as PLAINTEXT - listeners.push(KafkaListener { - name: KafkaListenerName::Internal, - host: LISTENER_LOCAL_ADDRESS.to_string(), - port: kafka_security.internal_port().to_string(), - }); - advertised_listeners.push(KafkaListener { - name: KafkaListenerName::Internal, - host: pod_fqdn.to_string(), - port: kafka_security.internal_port().to_string(), - }); - listener_security_protocol_map.insert( - KafkaListenerName::Internal, - KafkaListenerProtocol::Plaintext, - ); - listener_security_protocol_map.insert( - KafkaListenerName::Controller, - KafkaListenerProtocol::Plaintext, - ); - } - - // BOOTSTRAP - if kafka_security.has_kerberos_enabled() { - listeners.push(KafkaListener { - name: KafkaListenerName::Bootstrap, - host: LISTENER_LOCAL_ADDRESS.to_string(), - port: kafka_security.bootstrap_port().to_string(), - }); - advertised_listeners.push(KafkaListener { - name: KafkaListenerName::Bootstrap, - host: node_address_cmd(STACKABLE_LISTENER_BROKER_DIR), - port: node_port_cmd( - STACKABLE_LISTENER_BROKER_DIR, - kafka_security.client_port_name(), - ), - }); - listener_security_protocol_map - .insert(KafkaListenerName::Bootstrap, KafkaListenerProtocol::SaslSsl); - } + // CLIENT (advertised) listener + listeners.push(KafkaListener { + name: KafkaListenerName::Client, + host: LISTENER_LOCAL_ADDRESS.to_string(), + port: kafka_security.client_port().to_string(), + }); + advertised_listeners.push(KafkaListener { + name: KafkaListenerName::Client, + host: node_address_cmd(STACKABLE_LISTENER_BROKER_DIR), + port: node_port_cmd( + STACKABLE_LISTENER_BROKER_DIR, + kafka_security.client_port_name(), + ), + }); + listener_security_protocol_map + .insert(KafkaListenerName::Client, kafka_security.client_protocol()); + + // INTERNAL (advertised) listener + listeners.push(KafkaListener { + name: KafkaListenerName::Internal, + host: pod_fqdn.to_string(), + port: kafka_security.internal_port().to_string(), + }); + advertised_listeners.push(KafkaListener { + name: KafkaListenerName::Internal, + host: pod_fqdn.to_string(), + port: kafka_security.internal_port().to_string(), + }); + listener_security_protocol_map.insert( + KafkaListenerName::Internal, + kafka_security.internal_protocol(), + ); Ok(KafkaListenerConfig { listeners, @@ -339,13 +235,12 @@ pub fn node_port_cmd(directory: &str, port_name: &str) -> String { format!("${{file:UTF-8:{directory}/default-address/ports/{port_name}}}") } -pub fn pod_fqdn( +pub fn internal_pod_fqdn( kafka: &v1alpha1::KafkaCluster, - sts_service_name: &str, cluster_info: &KubernetesClusterInfo, ) -> Result { Ok(format!( - "${{env:POD_NAME}}.{sts_service_name}.{namespace}.svc.{cluster_domain}", + "${{env:POD_NAME}}.${{env:ROLEGROUP_HEADLESS_SERVICE_NAME}}.{namespace}.svc.{cluster_domain}", namespace = kafka.namespace().context(ObjectHasNoNamespaceSnafu)?, cluster_domain = cluster_info.cluster_domain )) @@ -414,11 +309,11 @@ mod tests { config.listeners(), format!( "{name}://{host}:{port},{internal_name}://{internal_host}:{internal_port}", - name = KafkaListenerName::ClientAuth, + name = KafkaListenerName::Client, host = LISTENER_LOCAL_ADDRESS, port = kafka_security.client_port(), internal_name = KafkaListenerName::Internal, - internal_host = LISTENER_LOCAL_ADDRESS, + internal_host = internal_pod_fqdn(&kafka, &cluster_info).unwrap(), internal_port = kafka_security.internal_port(), ) ); @@ -427,19 +322,14 @@ mod tests { config.advertised_listeners(), format!( "{name}://{host}:{port},{internal_name}://{internal_host}:{internal_port}", - name = KafkaListenerName::ClientAuth, + name = KafkaListenerName::Client, host = node_address_cmd(STACKABLE_LISTENER_BROKER_DIR), port = node_port_cmd( STACKABLE_LISTENER_BROKER_DIR, kafka_security.client_port_name() ), internal_name = KafkaListenerName::Internal, - internal_host = pod_fqdn( - &kafka, - &rolegroup_ref.rolegroup_headless_service_name(), - &cluster_info - ) - .unwrap(), + internal_host = internal_pod_fqdn(&kafka, &cluster_info).unwrap(), internal_port = kafka_security.internal_port(), ) ); @@ -447,15 +337,11 @@ mod tests { assert_eq!( config.listener_security_protocol_map(), format!( - "{name}:{protocol},{internal_name}:{internal_protocol},{controller_name}:{controller_protocol},{controller_auth_name}:{controller_auth_protocol}", - name = KafkaListenerName::ClientAuth, + "{name}:{protocol},{internal_name}:{internal_protocol}", + name = KafkaListenerName::Client, protocol = KafkaListenerProtocol::Ssl, internal_name = KafkaListenerName::Internal, internal_protocol = KafkaListenerProtocol::Ssl, - controller_name = KafkaListenerName::Controller, - controller_protocol = KafkaListenerProtocol::Ssl, - controller_auth_name = KafkaListenerName::ControllerAuth, - controller_auth_protocol = KafkaListenerProtocol::Ssl, ) ); @@ -476,7 +362,7 @@ mod tests { host = LISTENER_LOCAL_ADDRESS, port = kafka_security.client_port(), internal_name = KafkaListenerName::Internal, - internal_host = LISTENER_LOCAL_ADDRESS, + internal_host = internal_pod_fqdn(&kafka, &cluster_info).unwrap(), internal_port = kafka_security.internal_port(), ) ); @@ -492,12 +378,7 @@ mod tests { kafka_security.client_port_name() ), internal_name = KafkaListenerName::Internal, - internal_host = pod_fqdn( - &kafka, - &rolegroup_ref.rolegroup_headless_service_name(), - &cluster_info - ) - .unwrap(), + internal_host = internal_pod_fqdn(&kafka, &cluster_info).unwrap(), internal_port = kafka_security.internal_port(), ) ); @@ -505,13 +386,11 @@ mod tests { assert_eq!( config.listener_security_protocol_map(), format!( - "{name}:{protocol},{internal_name}:{internal_protocol},{controller_name}:{controller_protocol}", + "{name}:{protocol},{internal_name}:{internal_protocol}", name = KafkaListenerName::Client, protocol = KafkaListenerProtocol::Ssl, internal_name = KafkaListenerName::Internal, internal_protocol = KafkaListenerProtocol::Ssl, - controller_name = KafkaListenerName::Controller, - controller_protocol = KafkaListenerProtocol::Ssl, ) ); @@ -533,7 +412,7 @@ mod tests { host = LISTENER_LOCAL_ADDRESS, port = kafka_security.client_port(), internal_name = KafkaListenerName::Internal, - internal_host = LISTENER_LOCAL_ADDRESS, + internal_host = internal_pod_fqdn(&kafka, &cluster_info).unwrap(), internal_port = kafka_security.internal_port(), ) ); @@ -549,12 +428,7 @@ mod tests { kafka_security.client_port_name() ), internal_name = KafkaListenerName::Internal, - internal_host = pod_fqdn( - &kafka, - &rolegroup_ref.rolegroup_headless_service_name(), - &cluster_info - ) - .unwrap(), + internal_host = internal_pod_fqdn(&kafka, &cluster_info).unwrap(), internal_port = kafka_security.internal_port(), ) ); @@ -562,13 +436,11 @@ mod tests { assert_eq!( config.listener_security_protocol_map(), format!( - "{name}:{protocol},{internal_name}:{internal_protocol},{controller_name}:{controller_protocol}", + "{name}:{protocol},{internal_name}:{internal_protocol}", name = KafkaListenerName::Client, protocol = KafkaListenerProtocol::Plaintext, internal_name = KafkaListenerName::Internal, internal_protocol = KafkaListenerProtocol::Plaintext, - controller_name = KafkaListenerName::Controller, - controller_protocol = KafkaListenerProtocol::Plaintext, ) ); } @@ -617,23 +489,20 @@ mod tests { assert_eq!( config.listeners(), format!( - "{name}://{host}:{port},{internal_name}://{internal_host}:{internal_port},{bootstrap_name}://{bootstrap_host}:{bootstrap_port}", + "{name}://{host}:{port},{internal_name}://{internal_host}:{internal_port}", name = KafkaListenerName::Client, host = LISTENER_LOCAL_ADDRESS, port = kafka_security.client_port(), internal_name = KafkaListenerName::Internal, - internal_host = LISTENER_LOCAL_ADDRESS, + internal_host = internal_pod_fqdn(&kafka, &cluster_info).unwrap(), internal_port = kafka_security.internal_port(), - bootstrap_name = KafkaListenerName::Bootstrap, - bootstrap_host = LISTENER_LOCAL_ADDRESS, - bootstrap_port = kafka_security.bootstrap_port(), ) ); assert_eq!( config.advertised_listeners(), format!( - "{name}://{host}:{port},{internal_name}://{internal_host}:{internal_port},{bootstrap_name}://{bootstrap_host}:{bootstrap_port}", + "{name}://{host}:{port},{internal_name}://{internal_host}:{internal_port}", name = KafkaListenerName::Client, host = node_address_cmd(STACKABLE_LISTENER_BROKER_DIR), port = node_port_cmd( @@ -641,34 +510,19 @@ mod tests { kafka_security.client_port_name() ), internal_name = KafkaListenerName::Internal, - internal_host = pod_fqdn( - &kafka, - &rolegroup_ref.rolegroup_headless_service_name(), - &cluster_info - ) - .unwrap(), + internal_host = internal_pod_fqdn(&kafka, &cluster_info).unwrap(), internal_port = kafka_security.internal_port(), - bootstrap_name = KafkaListenerName::Bootstrap, - bootstrap_host = node_address_cmd(STACKABLE_LISTENER_BROKER_DIR), - bootstrap_port = node_port_cmd( - STACKABLE_LISTENER_BROKER_DIR, - kafka_security.client_port_name() - ), ) ); assert_eq!( config.listener_security_protocol_map(), format!( - "{name}:{protocol},{internal_name}:{internal_protocol},{bootstrap_name}:{bootstrap_protocol},{controller_name}:{controller_protocol}", + "{name}:{protocol},{internal_name}:{internal_protocol}", name = KafkaListenerName::Client, protocol = KafkaListenerProtocol::SaslSsl, internal_name = KafkaListenerName::Internal, internal_protocol = KafkaListenerProtocol::Ssl, - bootstrap_name = KafkaListenerName::Bootstrap, - bootstrap_protocol = KafkaListenerProtocol::SaslSsl, - controller_name = KafkaListenerName::Controller, - controller_protocol = KafkaListenerProtocol::Ssl, ) ); } diff --git a/rust/operator-binary/src/crd/role/mod.rs b/rust/operator-binary/src/crd/role/mod.rs index 47210ea4..c48ee8cc 100644 --- a/rust/operator-binary/src/crd/role/mod.rs +++ b/rust/operator-binary/src/crd/role/mod.rs @@ -432,7 +432,11 @@ impl AnyConfig { } } - pub fn listener_class(&self) -> Option<&String> { + // This returns the listener class that exposes Kafka to clients. + // This corresponds to the CLIENT Kafka listener. + // This class is used to build one service per server (broker) pod. + // For Kraft controllers, there is no such listener class. + pub fn client_listener_class(&self) -> Option<&String> { match self { AnyConfig::Broker(broker_config) => Some(&broker_config.broker_listener_class), AnyConfig::Controller(_) => None, diff --git a/rust/operator-binary/src/crd/security.rs b/rust/operator-binary/src/crd/security.rs index 325f95fc..09b0fce5 100644 --- a/rust/operator-binary/src/crd/security.rs +++ b/rust/operator-binary/src/crd/security.rs @@ -204,6 +204,17 @@ impl KafkaTlsSecurity { } } + /// Return the Kafka (secure) client protocol + pub fn client_protocol(&self) -> KafkaListenerProtocol { + if self.has_kerberos_enabled() { + KafkaListenerProtocol::SaslSsl + } else if self.tls_enabled() { + KafkaListenerProtocol::Ssl + } else { + KafkaListenerProtocol::Plaintext + } + } + pub fn bootstrap_port(&self) -> u16 { if self.tls_enabled() { Self::SECURE_BOOTSTRAP_PORT @@ -234,74 +245,71 @@ impl KafkaTlsSecurity { } } + /// Return the Kafka (secure) internal protocol + pub fn internal_protocol(&self) -> KafkaListenerProtocol { + if self.tls_internal_secret_class().is_some() { + KafkaListenerProtocol::Ssl + } else { + KafkaListenerProtocol::Plaintext + } + } + /// Returns the commands for the kcat readiness probe. + /// kcat will use the CLIENT listener to connect to the Kafka broker. pub fn kcat_prober_container_commands(&self) -> Vec { - let mut args = vec![]; - let port = self.client_port(); + let mut args = vec![ + "/bin/bash".to_string(), + "-x".to_string(), + "-euo".to_string(), + "pipefail".to_string(), + "-c".to_string(), + ]; + // the entire command needs to be subject to the -c directive + // to prevent short-circuiting + let mut bash_args = vec![]; + bash_args.push( + format!( + "export POD_BROKER_LISTENER_ADDRESS={};", + node_address_cmd_env(STACKABLE_LISTENER_BROKER_DIR) + ) + .to_string(), + ); + bash_args.push( + format!( + "export POD_BROKER_LISTENER_PORT={};", + node_port_cmd_env(STACKABLE_LISTENER_BROKER_DIR, self.client_port_name()) + ) + .to_string(), + ); + bash_args.push( + format!( + "export KERBEROS_REALM=$(grep -oP 'default_realm = \\K.*' {});", + STACKABLE_KERBEROS_KRB5_PATH + ) + .to_string(), + ); + + bash_args.push("/stackable/kcat".to_string()); + bash_args.push("-b".to_string()); + + bash_args.push("$POD_BROKER_LISTENER_ADDRESS:$POD_BROKER_LISTENER_PORT".to_string()); if self.tls_client_authentication_class().is_some() { - args.push("/stackable/kcat".to_string()); - args.push("-b".to_string()); - args.push(format!("localhost:{}", port)); - args.extend(Self::kcat_client_auth_ssl(Self::STACKABLE_TLS_KCAT_DIR)); - args.push("-L".to_string()); + bash_args.extend(Self::kcat_client_auth_ssl(Self::STACKABLE_TLS_KCAT_DIR)); + } else if self.tls_server_secret_class().is_some() { + bash_args.extend(Self::kcat_client_ssl(Self::STACKABLE_TLS_KCAT_DIR)); } else if self.has_kerberos_enabled() { let service_name = KafkaRole::Broker.kerberos_service_name(); - // here we need to specify a shell so that variable substitution will work - // see e.g. https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/V1ExecAction.md - args.push("/bin/bash".to_string()); - args.push("-x".to_string()); - args.push("-euo".to_string()); - args.push("pipefail".to_string()); - args.push("-c".to_string()); - - // the entire command needs to be subject to the -c directive - // to prevent short-circuiting - let mut bash_args = vec![]; - bash_args.push( - format!( - "export KERBEROS_REALM=$(grep -oP 'default_realm = \\K.*' {});", - STACKABLE_KERBEROS_KRB5_PATH - ) - .to_string(), - ); - bash_args.push( - format!( - "export POD_BROKER_LISTENER_ADDRESS={};", - node_address_cmd_env(STACKABLE_LISTENER_BROKER_DIR) - ) - .to_string(), - ); - bash_args.push( - format!( - "export POD_BROKER_LISTENER_PORT={};", - node_port_cmd_env(STACKABLE_LISTENER_BROKER_DIR, self.client_port_name()) - ) - .to_string(), - ); - bash_args.push("/stackable/kcat".to_string()); - bash_args.push("-b".to_string()); - bash_args.push("$POD_BROKER_LISTENER_ADDRESS:$POD_BROKER_LISTENER_PORT".to_string()); bash_args.extend(Self::kcat_client_sasl_ssl( Self::STACKABLE_TLS_KCAT_DIR, service_name, )); - bash_args.push("-L".to_string()); - - args.push(bash_args.join(" ")); - } else if self.tls_server_secret_class().is_some() { - args.push("/stackable/kcat".to_string()); - args.push("-b".to_string()); - args.push(format!("localhost:{}", port)); - args.extend(Self::kcat_client_ssl(Self::STACKABLE_TLS_KCAT_DIR)); - args.push("-L".to_string()); - } else { - args.push("/stackable/kcat".to_string()); - args.push("-b".to_string()); - args.push(format!("localhost:{}", port)); - args.push("-L".to_string()); } + bash_args.push("-L".to_string()); + + args.push(bash_args.join(" ")); + args } @@ -523,45 +531,17 @@ impl KafkaTlsSecurity { Ok(()) } - /// Returns required Kafka configuration settings for the `broker.properties` file - /// depending on the tls and authentication settings. - pub fn broker_config_settings(&self) -> BTreeMap { + // Builds the listener security properties for the two Kafka listeners: CLIENT and INTERNAL + // TODO: This function is tightly coupled with `crd::listener::get_kafka_listener_config()` + pub fn broker_listener_tls_properties(&self) -> BTreeMap { let mut config = BTreeMap::new(); // We set either client tls with authentication or client tls without authentication // If authentication is explicitly required we do not want to have any other CAs to // be trusted. - if self.tls_client_authentication_class().is_some() { - config.insert( - KafkaListenerName::ClientAuth.listener_ssl_keystore_location(), - format!("{}/keystore.p12", Self::STACKABLE_TLS_KAFKA_SERVER_DIR), - ); - config.insert( - KafkaListenerName::ClientAuth.listener_ssl_keystore_password(), - Self::SSL_STORE_PASSWORD.to_string(), - ); - config.insert( - KafkaListenerName::ClientAuth.listener_ssl_keystore_type(), - "PKCS12".to_string(), - ); - config.insert( - KafkaListenerName::ClientAuth.listener_ssl_truststore_location(), - format!("{}/truststore.p12", Self::STACKABLE_TLS_KAFKA_SERVER_DIR), - ); - config.insert( - KafkaListenerName::ClientAuth.listener_ssl_truststore_password(), - Self::SSL_STORE_PASSWORD.to_string(), - ); - config.insert( - KafkaListenerName::ClientAuth.listener_ssl_truststore_type(), - "PKCS12".to_string(), - ); - // client auth required - config.insert( - KafkaListenerName::ClientAuth.listener_ssl_client_auth(), - "required".to_string(), - ); - } else if self.tls_server_secret_class().is_some() { + if self.tls_server_secret_class().is_some() + || self.tls_client_authentication_class().is_some() + { config.insert( KafkaListenerName::Client.listener_ssl_keystore_location(), format!("{}/keystore.p12", Self::STACKABLE_TLS_KAFKA_SERVER_DIR), @@ -589,31 +569,6 @@ impl KafkaTlsSecurity { } if self.has_kerberos_enabled() { - // Bootstrap - config.insert( - KafkaListenerName::Bootstrap.listener_ssl_keystore_location(), - format!("{}/keystore.p12", Self::STACKABLE_TLS_KAFKA_SERVER_DIR), - ); - config.insert( - KafkaListenerName::Bootstrap.listener_ssl_keystore_password(), - Self::SSL_STORE_PASSWORD.to_string(), - ); - config.insert( - KafkaListenerName::Bootstrap.listener_ssl_keystore_type(), - "PKCS12".to_string(), - ); - config.insert( - KafkaListenerName::Bootstrap.listener_ssl_truststore_location(), - format!("{}/truststore.p12", Self::STACKABLE_TLS_KAFKA_SERVER_DIR), - ); - config.insert( - KafkaListenerName::Bootstrap.listener_ssl_truststore_password(), - Self::SSL_STORE_PASSWORD.to_string(), - ); - config.insert( - KafkaListenerName::Bootstrap.listener_ssl_truststore_type(), - "PKCS12".to_string(), - ); config.insert("sasl.enabled.mechanisms".to_string(), "GSSAPI".to_string()); config.insert( "sasl.kerberos.service.name".to_string(), @@ -623,12 +578,10 @@ impl KafkaTlsSecurity { "sasl.mechanism.inter.broker.protocol".to_string(), "GSSAPI".to_string(), ); - tracing::debug!("Kerberos configs added: [{:#?}]", config); } // Internal TLS if self.tls_internal_secret_class().is_some() { - // BROKERS config.insert( KafkaListenerName::Internal.listener_ssl_keystore_location(), format!("{}/keystore.p12", Self::STACKABLE_TLS_KAFKA_INTERNAL_DIR), @@ -653,31 +606,6 @@ impl KafkaTlsSecurity { KafkaListenerName::Internal.listener_ssl_truststore_type(), "PKCS12".to_string(), ); - // CONTROLLERS - config.insert( - KafkaListenerName::Controller.listener_ssl_keystore_location(), - format!("{}/keystore.p12", Self::STACKABLE_TLS_KAFKA_INTERNAL_DIR), - ); - config.insert( - KafkaListenerName::Controller.listener_ssl_keystore_password(), - Self::SSL_STORE_PASSWORD.to_string(), - ); - config.insert( - KafkaListenerName::Controller.listener_ssl_keystore_type(), - "PKCS12".to_string(), - ); - config.insert( - KafkaListenerName::Controller.listener_ssl_truststore_location(), - format!("{}/truststore.p12", Self::STACKABLE_TLS_KAFKA_INTERNAL_DIR), - ); - config.insert( - KafkaListenerName::Controller.listener_ssl_truststore_password(), - Self::SSL_STORE_PASSWORD.to_string(), - ); - config.insert( - KafkaListenerName::Controller.listener_ssl_truststore_type(), - "PKCS12".to_string(), - ); // client auth required config.insert( KafkaListenerName::Internal.listener_ssl_client_auth(), diff --git a/rust/operator-binary/src/resource/configmap.rs b/rust/operator-binary/src/resource/configmap.rs index ded83c59..4e4e0da3 100644 --- a/rust/operator-binary/src/resource/configmap.rs +++ b/rust/operator-binary/src/resource/configmap.rs @@ -103,7 +103,9 @@ pub fn build_rolegroup_config_map( )?; match merged_config { - AnyConfig::Broker(_) => kafka_config.extend(kafka_security.broker_config_settings()), + AnyConfig::Broker(_) => { + kafka_config.extend(kafka_security.broker_listener_tls_properties()) + } AnyConfig::Controller(_) => { kafka_config.extend(kafka_security.controller_config_settings()) } diff --git a/rust/operator-binary/src/resource/service.rs b/rust/operator-binary/src/resource/service.rs index 5e3e0b56..2665a75f 100644 --- a/rust/operator-binary/src/resource/service.rs +++ b/rust/operator-binary/src/resource/service.rs @@ -133,7 +133,7 @@ fn metrics_ports() -> Vec { fn headless_ports(kafka_security: &KafkaTlsSecurity) -> Vec { vec![ServicePort { name: Some(kafka_security.client_port_name().into()), - port: kafka_security.client_port().into(), + port: kafka_security.internal_port().into(), protocol: Some("TCP".to_string()), ..ServicePort::default() }] diff --git a/rust/operator-binary/src/resource/statefulset.rs b/rust/operator-binary/src/resource/statefulset.rs index 89154dad..e8d48211 100644 --- a/rust/operator-binary/src/resource/statefulset.rs +++ b/rust/operator-binary/src/resource/statefulset.rs @@ -282,6 +282,11 @@ pub fn build_broker_rolegroup_statefulset( }), ..EnvVar::default() }); + env.push(EnvVar { + name: "ROLEGROUP_HEADLESS_SERVICE_NAME".to_string(), + value: Some(rolegroup_ref.rolegroup_headless_service_name()), + ..EnvVar::default() + }); let cluster_id = kafka.cluster_id().context(ClusterIdMissingSnafu)?; @@ -428,7 +433,7 @@ pub fn build_broker_rolegroup_statefulset( .context(MetadataBuildSnafu)? .build(); - if let Some(listener_class) = merged_config.listener_class() { + if let Some(listener_class) = merged_config.client_listener_class() { pod_builder .add_listener_volume_by_listener_class( LISTENER_BROKER_VOLUME_NAME,