diff --git a/crates/stackable-operator/CHANGELOG.md b/crates/stackable-operator/CHANGELOG.md index 53e8bbf3d..8b9a612fe 100644 --- a/crates/stackable-operator/CHANGELOG.md +++ b/crates/stackable-operator/CHANGELOG.md @@ -4,6 +4,21 @@ All notable changes to this project will be documented in this file. ## [Unreleased] +### Added + +- Support `objectOverrides` ([#1118]). + +### Changed + +- BREAKING: `ClusterResources` now requires the objects added to implement `DeepMerge`. + This is very likely a stackable-operator internal change, but technically breaking ([#1118]). + +### Removed + +- BREAKING: `ClusterResources` no longer derives `Eq` and `PartialEq` ([#1118]). + +[#1118]: https://github.com/stackabletech/operator-rs/pull/1118 + ## [0.100.3] - 2025-10-31 ### Changed diff --git a/crates/stackable-operator/src/cluster_resources.rs b/crates/stackable-operator/src/cluster_resources.rs index a7b0c03c1..087965cdd 100644 --- a/crates/stackable-operator/src/cluster_resources.rs +++ b/crates/stackable-operator/src/cluster_resources.rs @@ -8,7 +8,7 @@ use std::{ #[cfg(doc)] use k8s_openapi::api::core::v1::{NodeSelector, Pod}; use k8s_openapi::{ - NamespaceResourceScope, + DeepMerge, NamespaceResourceScope, api::{ apps::v1::{ DaemonSet, DaemonSetSpec, Deployment, DeploymentSpec, StatefulSet, StatefulSetSpec, @@ -42,6 +42,7 @@ use crate::{ Label, LabelError, Labels, consts::{K8S_APP_INSTANCE_KEY, K8S_APP_MANAGED_BY_KEY, K8S_APP_NAME_KEY}, }, + patchinator::{self, ObjectOverrides, apply_patches}, utils::format_full_controller_name, }; @@ -87,6 +88,12 @@ pub enum Error { #[snafu(source(from(crate::client::Error, Box::new)))] source: Box, }, + + #[snafu(display("failed to parse user-provided object overrides"))] + ParseObjectOverrides { source: patchinator::Error }, + + #[snafu(display("failed to apply user-provided object overrides"))] + ApplyObjectOverrides { source: patchinator::Error }, } /// A cluster resource handled by [`ClusterResources`]. @@ -97,6 +104,7 @@ pub enum Error { /// it must be added to [`ClusterResources::delete_orphaned_resources`] as well. pub trait ClusterResource: Clone + + DeepMerge + Debug + DeserializeOwned + Resource @@ -332,6 +340,7 @@ impl ClusterResource for Deployment { /// use serde::{Deserialize, Serialize}; /// use stackable_operator::client::Client; /// use stackable_operator::cluster_resources::{self, ClusterResourceApplyStrategy, ClusterResources}; +/// use stackable_operator::patchinator::ObjectOverrides; /// use stackable_operator::product_config_utils::ValidatedRoleConfigByPropertyKind; /// use stackable_operator::role_utils::Role; /// use std::sync::Arc; @@ -348,7 +357,10 @@ impl ClusterResource for Deployment { /// plural = "AppClusters", /// namespaced, /// )] -/// struct AppClusterSpec {} +/// struct AppClusterSpec { +/// #[serde(flatten)] +/// pub object_overrides: ObjectOverrides, +/// } /// /// enum Error { /// CreateClusterResources { @@ -371,6 +383,7 @@ impl ClusterResource for Deployment { /// CONTROLLER_NAME, /// &app.object_ref(&()), /// ClusterResourceApplyStrategy::Default, +/// &app.spec.object_overrides, /// ) /// .map_err(|source| Error::CreateClusterResources { source })?; /// @@ -413,8 +426,8 @@ impl ClusterResource for Deployment { /// Ok(Action::await_change()) /// } /// ``` -#[derive(Debug, Eq, PartialEq)] -pub struct ClusterResources { +#[derive(Debug)] +pub struct ClusterResources<'a> { /// The namespace of the cluster namespace: String, @@ -442,9 +455,12 @@ pub struct ClusterResources { /// Strategy to manage how cluster resources are applied. Resources could be patched, merged /// or not applied at all depending on the strategy. apply_strategy: ClusterResourceApplyStrategy, + + /// Arbitrary Kubernetes object overrides specified by the user via the CRD. + object_overrides: &'a ObjectOverrides, } -impl ClusterResources { +impl<'a> ClusterResources<'a> { /// Constructs new `ClusterResources`. /// /// # Arguments @@ -470,6 +486,7 @@ impl ClusterResources { controller_name: &str, cluster: &ObjectReference, apply_strategy: ClusterResourceApplyStrategy, + object_overrides: &'a ObjectOverrides, ) -> Result { let namespace = cluster .namespace @@ -494,6 +511,7 @@ impl ClusterResources { manager: format_full_controller_name(operator_name, controller_name), resource_ids: Default::default(), apply_strategy, + object_overrides, }) } @@ -563,7 +581,10 @@ impl ClusterResources { .unwrap_or_else(|err| warn!("{}", err)); } - let mutated = resource.maybe_mutate(&self.apply_strategy); + let mut mutated = resource.maybe_mutate(&self.apply_strategy); + + // We apply the object overrides of the user at the very last to offer maximum flexibility. + apply_patches(&mut mutated, self.object_overrides).context(ApplyObjectOverridesSnafu)?; let patched_resource = self .apply_strategy diff --git a/crates/stackable-operator/src/crd/listener/listeners/v1alpha1_impl.rs b/crates/stackable-operator/src/crd/listener/listeners/v1alpha1_impl.rs index b9351cf32..4c2c5b602 100644 --- a/crates/stackable-operator/src/crd/listener/listeners/v1alpha1_impl.rs +++ b/crates/stackable-operator/src/crd/listener/listeners/v1alpha1_impl.rs @@ -1,7 +1,148 @@ -use crate::crd::listener::listeners::v1alpha1::ListenerSpec; +use k8s_openapi::{DeepMerge, merge_strategies}; + +use crate::crd::listener::listeners::v1alpha1::{ + Listener, ListenerIngress, ListenerPort, ListenerSpec, ListenerStatus, +}; impl ListenerSpec { pub(super) const fn default_publish_not_ready_addresses() -> Option { Some(true) } } + +impl DeepMerge for Listener { + fn merge_from(&mut self, other: Self) { + DeepMerge::merge_from(&mut self.metadata, other.metadata); + DeepMerge::merge_from(&mut self.spec, other.spec); + DeepMerge::merge_from(&mut self.status, other.status); + } +} + +impl DeepMerge for ListenerSpec { + fn merge_from(&mut self, other: Self) { + DeepMerge::merge_from(&mut self.class_name, other.class_name); + merge_strategies::map::granular( + &mut self.extra_pod_selector_labels, + other.extra_pod_selector_labels, + |current_item, other_item| { + DeepMerge::merge_from(current_item, other_item); + }, + ); + merge_strategies::list::map( + &mut self.ports, + other.ports, + &[|lhs, rhs| lhs.name == rhs.name], + |current_item, other_item| { + DeepMerge::merge_from(current_item, other_item); + }, + ); + DeepMerge::merge_from( + &mut self.publish_not_ready_addresses, + other.publish_not_ready_addresses, + ); + } +} + +impl DeepMerge for ListenerStatus { + fn merge_from(&mut self, other: Self) { + DeepMerge::merge_from(&mut self.service_name, other.service_name); + merge_strategies::list::map( + &mut self.ingress_addresses, + other.ingress_addresses, + &[|lhs, rhs| lhs.address == rhs.address], + |current_item, other_item| { + DeepMerge::merge_from(current_item, other_item); + }, + ); + merge_strategies::map::granular( + &mut self.node_ports, + other.node_ports, + |current_item, other_item| { + DeepMerge::merge_from(current_item, other_item); + }, + ); + } +} + +impl DeepMerge for ListenerIngress { + fn merge_from(&mut self, other: Self) { + DeepMerge::merge_from(&mut self.address, other.address); + self.address_type = other.address_type; + merge_strategies::map::granular( + &mut self.ports, + other.ports, + |current_item, other_item| { + DeepMerge::merge_from(current_item, other_item); + }, + ); + } +} + +impl DeepMerge for ListenerPort { + fn merge_from(&mut self, other: Self) { + DeepMerge::merge_from(&mut self.name, other.name); + DeepMerge::merge_from(&mut self.port, other.port); + DeepMerge::merge_from(&mut self.protocol, other.protocol); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn deep_merge_listener() { + let mut base: ListenerSpec = serde_yaml::from_str( + " +className: my-listener-class +extraPodSelectorLabels: + foo: bar +ports: + - name: http + port: 8080 + protocol: http + - name: https + port: 8080 + protocol: https +# publishNotReadyAddresses defaults to true +", + ) + .unwrap(); + + let patch: ListenerSpec = serde_yaml::from_str( + " +className: custom-listener-class +extraPodSelectorLabels: + foo: overridden + extra: label +ports: + - name: https + port: 8443 +publishNotReadyAddresses: false +", + ) + .unwrap(); + + base.merge_from(patch); + + let expected: ListenerSpec = serde_yaml::from_str( + " +className: custom-listener-class +extraPodSelectorLabels: + foo: overridden + extra: label +ports: + - name: http + port: 8080 + protocol: http + - name: https + port: 8443 # overridden + protocol: https +publishNotReadyAddresses: false +", + ) + .unwrap(); + + assert_eq!(base, expected); + } +} diff --git a/crates/stackable-operator/src/lib.rs b/crates/stackable-operator/src/lib.rs index 5e08ddcab..49295c6eb 100644 --- a/crates/stackable-operator/src/lib.rs +++ b/crates/stackable-operator/src/lib.rs @@ -22,6 +22,7 @@ pub mod kvp; pub mod logging; pub mod memory; pub mod namespace; +pub mod patchinator; pub mod pod_utils; pub mod product_config_utils; pub mod product_logging; diff --git a/crates/stackable-operator/src/patchinator/crd.rs b/crates/stackable-operator/src/patchinator/crd.rs new file mode 100644 index 000000000..3f2e604c5 --- /dev/null +++ b/crates/stackable-operator/src/patchinator/crd.rs @@ -0,0 +1,14 @@ +use kube::api::DynamicObject; +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; + +use crate::utils::crds::raw_object_list_schema; + +#[derive(Clone, Debug, Deserialize, JsonSchema, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct ObjectOverrides { + /// TODO docs + #[serde(default)] + #[schemars(schema_with = "raw_object_list_schema")] + pub object_overrides: Vec, +} diff --git a/crates/stackable-operator/src/patchinator/mod.rs b/crates/stackable-operator/src/patchinator/mod.rs new file mode 100644 index 000000000..04e7cda81 --- /dev/null +++ b/crates/stackable-operator/src/patchinator/mod.rs @@ -0,0 +1,500 @@ +use k8s_openapi::DeepMerge; +use kube::core::DynamicObject; +use serde::de::DeserializeOwned; +use snafu::{ResultExt, Snafu}; + +mod crd; +pub use crd::ObjectOverrides; + +#[derive(Debug, Snafu)] +pub enum Error { + #[snafu(display( + "failed to parse dynamic object as apiVersion {target_api_version:?} and kind {target_kind:?}" + ))] + ParseDynamicObject { + source: kube::core::dynamic::ParseDynamicObjectError, + target_api_version: String, + target_kind: String, + }, +} + +pub fn apply_patches(base: &mut R, patches: &ObjectOverrides) -> Result<(), Error> +where + R: kube::Resource + DeepMerge + DeserializeOwned, +{ + for patch in &patches.object_overrides { + apply_patch(base, patch)?; + } + Ok(()) +} + +pub fn apply_patch(base: &mut R, patch: &DynamicObject) -> Result<(), Error> +where + R: kube::Resource + DeepMerge + DeserializeOwned, +{ + use kube::ResourceExt; + + let Some(patch_type) = &patch.types else { + return Ok(()); + }; + if patch_type.api_version != R::api_version(&()) || patch_type.kind != R::kind(&()) { + return Ok(()); + } + let Some(patch_name) = &patch.metadata.name else { + return Ok(()); + }; + + // The name always needs to match + if &base.name_any() != patch_name { + return Ok(()); + } + + // If there is a namespace on the base object, it needs to match as well + // Note that it is not set for cluster-scoped objects. + if base.namespace() != patch.metadata.namespace { + return Ok(()); + } + + let deserialized_patch = + patch + .clone() + .try_parse() + .with_context(|_| ParseDynamicObjectSnafu { + target_api_version: R::api_version(&()), + target_kind: R::kind(&()), + })?; + base.merge_from(deserialized_patch); + + Ok(()) +} + +#[cfg(test)] +mod tests { + use std::{collections::BTreeMap, vec}; + + use k8s_openapi::{ + ByteString, Metadata, + api::{ + apps::v1::{ + RollingUpdateStatefulSetStrategy, StatefulSet, StatefulSetSpec, + StatefulSetUpdateStrategy, + }, + core::v1::{ + ConfigMap, Container, ContainerPort, PodSpec, PodTemplateSpec, Secret, + ServiceAccount, + }, + storage::v1::StorageClass, + }, + apimachinery::pkg::util::intstr::IntOrString, + }; + use kube::api::ObjectMeta; + + use super::*; + + /// Using [`serde_yaml`] to generate the test data + fn generate_service_account() -> ServiceAccount { + serde_yaml::from_str( + " +apiVersion: v1 +kind: ServiceAccount +metadata: + name: trino-serviceaccount + namespace: default + labels: + app.kubernetes.io/instance: trino + app.kubernetes.io/managed-by: trino.stackable.tech_trinocluster + app.kubernetes.io/name: trino + ownerReferences: + - apiVersion: trino.stackable.tech/v1alpha1 + controller: true + kind: TrinoCluster + name: trino + uid: c85bfb53-a28e-4782-baaf-3c218a25f192 +", + ) + .unwrap() + } + + /// Generate the test data programmatically (as operators would normally do) + fn generate_stateful_set() -> StatefulSet { + StatefulSet { + metadata: generate_metadata("trino-coordinator-default"), + spec: Some(StatefulSetSpec { + service_name: Some("trino-coordinator-default".to_owned()), + update_strategy: Some(StatefulSetUpdateStrategy { + rolling_update: Some(RollingUpdateStatefulSetStrategy { + max_unavailable: Some(IntOrString::Int(42)), + ..Default::default() + }), + ..Default::default() + }), + template: PodTemplateSpec { + metadata: Some(ObjectMeta { + labels: Some(generate_labels()), + ..Default::default() + }), + spec: Some(PodSpec { + containers: vec![Container { + name: "trino".to_owned(), + image: Some("trino-image".to_owned()), + ports: Some(vec![ContainerPort { + container_port: 8443, + name: Some("https".to_owned()), + protocol: Some("https".to_owned()), + ..Default::default() + }]), + ..Default::default() + }], + service_account_name: Some("trino-serviceaccount".to_owned()), + ..Default::default() + }), + }, + ..Default::default() + }), + ..Default::default() + } + } + + fn generate_metadata(name: impl Into) -> ObjectMeta { + ObjectMeta { + name: Some(name.into()), + namespace: Some("default".to_owned()), + labels: Some(generate_labels()), + ..Default::default() + } + } + + fn generate_labels() -> BTreeMap { + BTreeMap::from([("app.kubernetes.io/name".to_owned(), "trino".to_owned())]) + } + + #[test] + fn service_account_patched() { + let mut sa = generate_service_account(); + let object_overrides: ObjectOverrides = serde_yaml::from_str( + " +objectOverrides: + - apiVersion: v1 + kind: ServiceAccount + metadata: + name: trino-serviceaccount + namespace: default + labels: + app.kubernetes.io/name: overwritten + foo: bar +", + ) + .expect("test input is valid YAML"); + + assert_has_label(&sa, "app.kubernetes.io/name", "trino"); + apply_patches(&mut sa, &object_overrides).unwrap(); + assert_has_label(&sa, "app.kubernetes.io/name", "overwritten"); + } + + #[test] + fn service_account_not_patched_as_different_name() { + let mut sa = generate_service_account(); + let object_overrides: ObjectOverrides = serde_yaml::from_str( + " +objectOverrides: + - apiVersion: v1 + kind: ServiceAccount + metadata: + name: other-sa + namespace: default + labels: + app.kubernetes.io/name: overwritten + foo: bar +", + ) + .expect("test input is valid YAML"); + + let original = sa.clone(); + apply_patches(&mut sa, &object_overrides).unwrap(); + assert_eq!(sa, original, "The patch shouldn't have changed anything"); + } + + #[test] + fn service_account_not_patched_as_different_namespace() { + let mut sa = generate_service_account(); + let object_overrides: ObjectOverrides = serde_yaml::from_str( + " +objectOverrides: + - apiVersion: v1 + kind: ServiceAccount + metadata: + name: trino-serviceaccount + namespace: other-namespace + labels: + app.kubernetes.io/name: overwritten + foo: bar +", + ) + .expect("test input is valid YAML"); + + let original = sa.clone(); + apply_patches(&mut sa, &object_overrides).unwrap(); + assert_eq!(sa, original, "The patch shouldn't have changed anything"); + } + + #[test] + fn service_account_not_patched_as_different_api_version() { + let mut sa = generate_service_account(); + let object_overrides: ObjectOverrides = serde_yaml::from_str( + " +objectOverrides: + - apiVersion: v42 + kind: ServiceAccount + metadata: + name: trino-serviceaccount + namespace: default + labels: + app.kubernetes.io/name: overwritten + foo: bar +", + ) + .expect("test input is valid YAML"); + + let original = sa.clone(); + apply_patches(&mut sa, &object_overrides).unwrap(); + assert_eq!(sa, original, "The patch shouldn't have changed anything"); + } + + #[test] + fn statefulset_patched_multiple_patches() { + let mut sts = generate_stateful_set(); + let object_overrides: ObjectOverrides = serde_yaml::from_str( + " +objectOverrides: + - apiVersion: v1 + kind: ServiceAccount + metadata: + name: trino-serviceaccount + namespace: default + labels: + app.kubernetes.io/name: overwritten + foo: bar + - apiVersion: apps/v1 + kind: StatefulSet + metadata: + name: trino-coordinator-default + namespace: default + spec: + template: + metadata: + labels: + foo: bar + spec: + containers: + - name: trino + image: custom-image + - apiVersion: apps/v1 + kind: StatefulSet + metadata: + name: trino-coordinator-default + namespace: default + spec: + replicas: 3 +", + ) + .expect("test input is valid YAML"); + + let get_replicas = |sts: &StatefulSet| sts.spec.as_ref().unwrap().replicas; + let get_trino_container = |sts: &StatefulSet| { + sts.spec + .as_ref() + .unwrap() + .template + .spec + .as_ref() + .unwrap() + .containers + .iter() + .find(|c| c.name == "trino") + .unwrap() + .clone() + }; + let get_trino_container_image = |sts: &StatefulSet| get_trino_container(sts).image; + + assert_eq!(get_replicas(&sts), None); + assert_eq!( + get_trino_container_image(&sts).as_deref(), + Some("trino-image") + ); + apply_patches(&mut sts, &object_overrides).unwrap(); + assert_eq!(get_replicas(&sts), Some(3)); + assert_eq!( + get_trino_container_image(&sts).as_deref(), + Some("custom-image") + ); + } + + #[test] + fn configmap_patched() { + let mut cm: ConfigMap = serde_yaml::from_str( + " + apiVersion: v1 + kind: ConfigMap + metadata: + name: game-demo + data: + foo: bar + config.properties: |- + coordinator=true + http-server.https.enabled=true + log.properties: |- + =info +", + ) + .unwrap(); + let object_overrides: ObjectOverrides = serde_yaml::from_str( + " +objectOverrides: + - apiVersion: v1 + kind: ConfigMap + metadata: + name: game-demo + data: + foo: overwritten + log.properties: |- + =info,tech.stackable=debug +", + ) + .expect("test input is valid YAML"); + + assert_eq!( + cm.data.as_ref().unwrap(), + &BTreeMap::from([ + ("foo".to_owned(), "bar".to_owned()), + ( + "config.properties".to_owned(), + "coordinator=true\nhttp-server.https.enabled=true".to_owned() + ), + ("log.properties".to_owned(), "=info".to_owned()), + ]) + ); + apply_patches(&mut cm, &object_overrides).unwrap(); + assert_eq!( + cm.data.as_ref().unwrap(), + &BTreeMap::from([ + ("foo".to_owned(), "overwritten".to_owned()), + ( + "config.properties".to_owned(), + "coordinator=true\nhttp-server.https.enabled=true".to_owned() + ), + ( + "log.properties".to_owned(), + "=info,tech.stackable=debug".to_owned() + ), + ]) + ); + } + + #[test] + fn secret_patched() { + let mut secret: Secret = serde_yaml::from_str( + " + apiVersion: v1 + kind: Secret + metadata: + name: dotfile-secret + stringData: + foo: bar + data: + raw: YmFyCg== # echo bar | base64 +", + ) + .unwrap(); + let object_overrides: ObjectOverrides = serde_yaml::from_str( + " +objectOverrides: + - apiVersion: v1 + kind: Secret + metadata: + name: dotfile-secret + stringData: + foo: overwritten + data: + raw: b3ZlcndyaXR0ZW4K # echo overwritten | base64 +", + ) + .expect("test input is valid YAML"); + + assert_eq!( + secret.string_data.as_ref().unwrap(), + &BTreeMap::from([("foo".to_owned(), "bar".to_owned())]) + ); + assert_eq!( + secret.data.as_ref().unwrap(), + &BTreeMap::from([("raw".to_owned(), ByteString(b"bar\n".to_vec()))]) + ); + + apply_patches(&mut secret, &object_overrides).unwrap(); + assert_eq!( + secret.string_data.as_ref().unwrap(), + &BTreeMap::from([("foo".to_owned(), "overwritten".to_owned()),]) + ); + assert_eq!( + secret.data.as_ref().unwrap(), + &BTreeMap::from([("raw".to_owned(), ByteString(b"overwritten\n".to_vec()))]) + ); + } + + #[test] + fn cluster_scoped_object_patched() { + let mut storage_class: StorageClass = serde_yaml::from_str( + " + apiVersion: storage.k8s.io/v1 + kind: StorageClass + metadata: + name: low-latency + labels: + foo: original + annotations: + storageclass.kubernetes.io/is-default-class: \"false\" + provisioner: csi-driver.example-vendor.example +", + ) + .unwrap(); + let object_overrides: ObjectOverrides = serde_yaml::from_str( + " +objectOverrides: + - apiVersion: v1 + kind: ServiceAccount + - apiVersion: storage.k8s.io/v1 + kind: StorageClass + metadata: + name: low-latency + labels: + foo: overwritten + annotations: + new: annotation + provisioner: custom-provisioner + - foo: bar + - {} +", + ) + .expect("test input is valid YAML"); + + assert_has_label(&storage_class, "foo", "original"); + apply_patches(&mut storage_class, &object_overrides).unwrap(); + assert_has_label(&storage_class, "foo", "overwritten"); + } + + fn assert_has_label>( + object: &O, + key: impl AsRef, + value: impl AsRef, + ) { + assert_eq!( + object + .metadata() + .labels + .as_ref() + .expect("labels missing") + .get(key.as_ref()) + .expect("key missing from labels"), + value.as_ref() + ); + } +} diff --git a/crates/stackable-shared/src/lib.rs b/crates/stackable-shared/src/lib.rs index 2f9b8ae93..767726d3d 100644 --- a/crates/stackable-shared/src/lib.rs +++ b/crates/stackable-shared/src/lib.rs @@ -3,6 +3,5 @@ pub mod crd; pub mod secret; - pub mod time; pub mod yaml;