From 00fd1b7f3fc1a6bc8ee2fc330a1b72cb5fe70222 Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Mon, 10 Nov 2025 16:03:09 +0100 Subject: [PATCH 1/6] feat: Support `objectOverrides` --- .../src/cluster_resources.rs | 31 +- .../crd/listener/listeners/v1alpha1_impl.rs | 89 +++- crates/stackable-shared/src/lib.rs | 2 +- .../stackable-shared/src/patchinator/mod.rs | 504 ++++++++++++++++++ 4 files changed, 620 insertions(+), 6 deletions(-) create mode 100644 crates/stackable-shared/src/patchinator/mod.rs diff --git a/crates/stackable-operator/src/cluster_resources.rs b/crates/stackable-operator/src/cluster_resources.rs index a7b0c03c1..e33fd5ac5 100644 --- a/crates/stackable-operator/src/cluster_resources.rs +++ b/crates/stackable-operator/src/cluster_resources.rs @@ -8,7 +8,7 @@ use std::{ #[cfg(doc)] use k8s_openapi::api::core::v1::{NodeSelector, Pod}; use k8s_openapi::{ - NamespaceResourceScope, + DeepMerge, NamespaceResourceScope, api::{ apps::v1::{ DaemonSet, DaemonSetSpec, Deployment, DeploymentSpec, StatefulSet, StatefulSetSpec, @@ -22,9 +22,10 @@ use k8s_openapi::{ }, apimachinery::pkg::apis::meta::v1::{LabelSelector, LabelSelectorRequirement}, }; -use kube::{Resource, ResourceExt, core::ErrorResponse}; +use kube::{Resource, ResourceExt, api::DynamicObject, core::ErrorResponse}; use serde::{Serialize, de::DeserializeOwned}; use snafu::{OptionExt, ResultExt, Snafu}; +use stackable_shared::patchinator::{self, apply_patches, parse_patches}; use strum::Display; use tracing::{debug, info, warn}; @@ -87,6 +88,12 @@ pub enum Error { #[snafu(source(from(crate::client::Error, Box::new)))] source: Box, }, + + #[snafu(display("failed to parse user-provided object overrides"))] + ParseObjectOverrides { source: patchinator::Error }, + + #[snafu(display("failed to apply user-provided object overrides"))] + ApplyObjectOverrides { source: patchinator::Error }, } /// A cluster resource handled by [`ClusterResources`]. @@ -97,6 +104,7 @@ pub enum Error { /// it must be added to [`ClusterResources::delete_orphaned_resources`] as well. pub trait ClusterResource: Clone + + DeepMerge + Debug + DeserializeOwned + Resource @@ -413,7 +421,7 @@ impl ClusterResource for Deployment { /// Ok(Action::await_change()) /// } /// ``` -#[derive(Debug, Eq, PartialEq)] +#[derive(Debug)] pub struct ClusterResources { /// The namespace of the cluster namespace: String, @@ -442,6 +450,9 @@ pub struct ClusterResources { /// Strategy to manage how cluster resources are applied. Resources could be patched, merged /// or not applied at all depending on the strategy. apply_strategy: ClusterResourceApplyStrategy, + + /// Arbitrary Kubernetes object overrides specified by the user via the CRD. + object_overrides: Vec, } impl ClusterResources { @@ -470,6 +481,7 @@ impl ClusterResources { controller_name: &str, cluster: &ObjectReference, apply_strategy: ClusterResourceApplyStrategy, + object_overrides: Option>, ) -> Result { let namespace = cluster .namespace @@ -483,6 +495,12 @@ impl ClusterResources { .uid .clone() .context(MissingObjectKeySnafu { key: "uid" })?; + let object_overrides = match object_overrides { + Some(object_overrides) => { + parse_patches(object_overrides).context(ParseObjectOverridesSnafu)? + } + None => vec![], + }; Ok(ClusterResources { namespace, @@ -494,6 +512,7 @@ impl ClusterResources { manager: format_full_controller_name(operator_name, controller_name), resource_ids: Default::default(), apply_strategy, + object_overrides, }) } @@ -563,7 +582,11 @@ impl ClusterResources { .unwrap_or_else(|err| warn!("{}", err)); } - let mutated = resource.maybe_mutate(&self.apply_strategy); + let mut mutated = resource.maybe_mutate(&self.apply_strategy); + + // We apply the object overrides of the user at the very last to offer maximum flexibility. + apply_patches(&mut mutated, self.object_overrides.iter()) + .context(ApplyObjectOverridesSnafu)?; let patched_resource = self .apply_strategy diff --git a/crates/stackable-operator/src/crd/listener/listeners/v1alpha1_impl.rs b/crates/stackable-operator/src/crd/listener/listeners/v1alpha1_impl.rs index b9351cf32..cf03f0522 100644 --- a/crates/stackable-operator/src/crd/listener/listeners/v1alpha1_impl.rs +++ b/crates/stackable-operator/src/crd/listener/listeners/v1alpha1_impl.rs @@ -1,7 +1,94 @@ -use crate::crd::listener::listeners::v1alpha1::ListenerSpec; +use crate::crd::listener::listeners::v1alpha1::{ + Listener, ListenerIngress, ListenerPort, ListenerSpec, ListenerStatus, +}; impl ListenerSpec { pub(super) const fn default_publish_not_ready_addresses() -> Option { Some(true) } } + +impl k8s_openapi::DeepMerge for Listener { + fn merge_from(&mut self, other: Self) { + k8s_openapi::DeepMerge::merge_from(&mut self.metadata, other.metadata); + k8s_openapi::DeepMerge::merge_from(&mut self.spec, other.spec); + k8s_openapi::DeepMerge::merge_from(&mut self.status, other.status); + } +} + +impl k8s_openapi::DeepMerge for ListenerSpec { + fn merge_from(&mut self, other: Self) { + k8s_openapi::DeepMerge::merge_from(&mut self.class_name, other.class_name); + k8s_openapi::merge_strategies::map::granular( + &mut self.extra_pod_selector_labels, + other.extra_pod_selector_labels, + |current_item, other_item| { + k8s_openapi::DeepMerge::merge_from(current_item, other_item); + }, + ); + k8s_openapi::merge_strategies::list::map( + &mut self.ports, + other.ports, + &[|lhs, rhs| lhs.name == rhs.name], + |current_item, other_item| { + k8s_openapi::DeepMerge::merge_from(current_item, other_item); + }, + ); + k8s_openapi::DeepMerge::merge_from( + &mut self.publish_not_ready_addresses, + other.publish_not_ready_addresses, + ); + todo!() + } +} + +impl k8s_openapi::DeepMerge for ListenerStatus { + fn merge_from(&mut self, other: Self) { + k8s_openapi::DeepMerge::merge_from(&mut self.service_name, other.service_name); + k8s_openapi::merge_strategies::list::map( + &mut self.ingress_addresses, + other.ingress_addresses, + &[|lhs, rhs| lhs.address == rhs.address], + |current_item, other_item| { + k8s_openapi::DeepMerge::merge_from(current_item, other_item); + }, + ); + k8s_openapi::merge_strategies::map::granular( + &mut self.node_ports, + other.node_ports, + |current_item, other_item| { + k8s_openapi::DeepMerge::merge_from(current_item, other_item); + }, + ); + } +} + +impl k8s_openapi::DeepMerge for ListenerIngress { + fn merge_from(&mut self, other: Self) { + k8s_openapi::DeepMerge::merge_from(&mut self.address, other.address); + self.address_type = other.address_type; + k8s_openapi::merge_strategies::map::granular( + &mut self.ports, + other.ports, + |current_item, other_item| { + k8s_openapi::DeepMerge::merge_from(current_item, other_item); + }, + ); + } +} + +impl k8s_openapi::DeepMerge for ListenerPort { + fn merge_from(&mut self, other: Self) { + k8s_openapi::DeepMerge::merge_from(&mut self.name, other.name); + k8s_openapi::DeepMerge::merge_from(&mut self.port, other.port); + k8s_openapi::DeepMerge::merge_from(&mut self.protocol, other.protocol); + } +} + +#[cfg(test)] +mod tests { + #[test] + fn deep_merge_listener() { + todo!("Add some basic tests for merging"); + } +} diff --git a/crates/stackable-shared/src/lib.rs b/crates/stackable-shared/src/lib.rs index 2f9b8ae93..bb49166fe 100644 --- a/crates/stackable-shared/src/lib.rs +++ b/crates/stackable-shared/src/lib.rs @@ -2,7 +2,7 @@ //! workspace. pub mod crd; +pub mod patchinator; pub mod secret; - pub mod time; pub mod yaml; diff --git a/crates/stackable-shared/src/patchinator/mod.rs b/crates/stackable-shared/src/patchinator/mod.rs new file mode 100644 index 000000000..6beb58a46 --- /dev/null +++ b/crates/stackable-shared/src/patchinator/mod.rs @@ -0,0 +1,504 @@ +use k8s_openapi::DeepMerge; +use kube::core::DynamicObject; +use serde::{Deserialize, de::DeserializeOwned}; +use snafu::{ResultExt, Snafu}; + +#[derive(Debug, Snafu)] +pub enum Error { + #[snafu(display("failed to deserialize dynamic object"))] + DeserializeDynamicObject { source: serde_yaml::Error }, + + #[snafu(display( + "failed to parse dynamic object as apiVersion {target_api_version:?} and kind {target_kind:?}" + ))] + ParseDynamicObject { + source: kube::core::dynamic::ParseDynamicObjectError, + target_api_version: String, + target_kind: String, + }, +} + +pub fn parse_patches(patch: impl AsRef) -> Result, Error> { + serde_yaml::Deserializer::from_str(patch.as_ref()) + .map(|manifest| DynamicObject::deserialize(manifest).context(DeserializeDynamicObjectSnafu)) + .collect() +} + +pub fn apply_patches<'a, R>( + base: &mut R, + patches: impl Iterator, +) -> Result<(), Error> +where + R: kube::Resource + DeepMerge + DeserializeOwned, +{ + for patch in patches { + apply_patch(base, patch)?; + } + Ok(()) +} + +pub fn apply_patch(base: &mut R, patch: &DynamicObject) -> Result<(), Error> +where + R: kube::Resource + DeepMerge + DeserializeOwned, +{ + use kube::ResourceExt; + + let Some(patch_type) = &patch.types else { + return Ok(()); + }; + if patch_type.api_version != R::api_version(&()) || patch_type.kind != R::kind(&()) { + return Ok(()); + } + let Some(patch_name) = &patch.metadata.name else { + return Ok(()); + }; + + // The name always needs to match + if &base.name_any() != patch_name { + return Ok(()); + } + + // If there is a namespace on the base object, it needs to match as well + // Note that it is not set for cluster-scoped objects. + if base.namespace() != patch.metadata.namespace { + return Ok(()); + } + + let deserialized_patch = + patch + .clone() + .try_parse() + .with_context(|_| ParseDynamicObjectSnafu { + target_api_version: R::api_version(&()), + target_kind: R::kind(&()), + })?; + base.merge_from(deserialized_patch); + + Ok(()) +} + +#[cfg(test)] +mod tests { + use std::{collections::BTreeMap, vec}; + + use k8s_openapi::{ + ByteString, Metadata, + api::{ + apps::v1::{ + RollingUpdateStatefulSetStrategy, StatefulSet, StatefulSetSpec, + StatefulSetUpdateStrategy, + }, + core::v1::{ + ConfigMap, Container, ContainerPort, PodSpec, PodTemplateSpec, Secret, + ServiceAccount, + }, + storage::v1::StorageClass, + }, + apimachinery::pkg::util::intstr::IntOrString, + }; + use kube::api::ObjectMeta; + + use super::*; + + fn generate_service_account() -> ServiceAccount { + serde_yaml::from_str( + " +apiVersion: v1 +kind: ServiceAccount +metadata: + name: trino-serviceaccount + namespace: default + labels: + app.kubernetes.io/instance: trino + app.kubernetes.io/managed-by: trino.stackable.tech_trinocluster + app.kubernetes.io/name: trino + ownerReferences: + - apiVersion: trino.stackable.tech/v1alpha1 + controller: true + kind: TrinoCluster + name: trino + uid: c85bfb53-a28e-4782-baaf-3c218a25f192 +", + ) + .unwrap() + } + + fn generate_stateful_set() -> StatefulSet { + StatefulSet { + metadata: generate_metadata("trino-coordinator-default"), + spec: Some(StatefulSetSpec { + service_name: Some("trino-coordinator-default".to_owned()), + update_strategy: Some(StatefulSetUpdateStrategy { + rolling_update: Some(RollingUpdateStatefulSetStrategy { + max_unavailable: Some(IntOrString::Int(42)), + ..Default::default() + }), + ..Default::default() + }), + template: PodTemplateSpec { + metadata: Some(ObjectMeta { + labels: Some(generate_labels()), + ..Default::default() + }), + spec: Some(PodSpec { + containers: vec![Container { + name: "trino".to_owned(), + image: Some("trino-image".to_owned()), + ports: Some(vec![ContainerPort { + container_port: 8443, + name: Some("https".to_owned()), + protocol: Some("https".to_owned()), + ..Default::default() + }]), + ..Default::default() + }], + service_account_name: Some("trino-serviceaccount".to_owned()), + ..Default::default() + }), + }, + ..Default::default() + }), + ..Default::default() + } + } + + fn generate_metadata(name: impl Into) -> ObjectMeta { + ObjectMeta { + name: Some(name.into()), + namespace: Some("default".to_owned()), + labels: Some(generate_labels()), + ..Default::default() + } + } + + fn generate_labels() -> BTreeMap { + BTreeMap::from([("app.kubernetes.io/name".to_owned(), "trino".to_owned())]) + } + + #[test] + fn service_account_patched() { + let mut sa = generate_service_account(); + let patches = parse_patches( + " +apiVersion: v1 +kind: ServiceAccount +metadata: + name: trino-serviceaccount + namespace: default + labels: + app.kubernetes.io/name: overwritten + foo: bar +", + ) + .unwrap(); + + assert_has_label(&sa, "app.kubernetes.io/name", "trino"); + apply_patches(&mut sa, patches.iter()).unwrap(); + assert_has_label(&sa, "app.kubernetes.io/name", "overwritten"); + } + + #[test] + fn service_account_not_patched_as_different_name() { + let mut sa = generate_service_account(); + let patches = parse_patches( + " +apiVersion: v1 +kind: ServiceAccount +metadata: + name: other-sa + namespace: default + labels: + app.kubernetes.io/name: overwritten + foo: bar +", + ) + .unwrap(); + + let original = sa.clone(); + apply_patches(&mut sa, patches.iter()).unwrap(); + assert_eq!(sa, original, "The patch shouldn't have changed anything"); + } + + #[test] + fn service_account_not_patched_as_different_namespace() { + let mut sa = generate_service_account(); + let patches = parse_patches( + " +apiVersion: v1 +kind: ServiceAccount +metadata: + name: trino-serviceaccount + namespace: other-namespace + labels: + app.kubernetes.io/name: overwritten + foo: bar +", + ) + .unwrap(); + + let original = sa.clone(); + apply_patches(&mut sa, patches.iter()).unwrap(); + assert_eq!(sa, original, "The patch shouldn't have changed anything"); + } + + #[test] + fn service_account_not_patched_as_different_api_version() { + let mut sa = generate_service_account(); + let patches = parse_patches( + " +apiVersion: v42 +kind: ServiceAccount +metadata: + name: trino-serviceaccount + namespace: default + labels: + app.kubernetes.io/name: overwritten + foo: bar +", + ) + .unwrap(); + + let original = sa.clone(); + apply_patches(&mut sa, patches.iter()).unwrap(); + assert_eq!(sa, original, "The patch shouldn't have changed anything"); + } + + #[test] + fn statefulset_patched_multiple_patches() { + let mut sts = generate_stateful_set(); + + let patches = parse_patches( + " +apiVersion: v1 +kind: ServiceAccount +metadata: + name: trino-serviceaccount + namespace: default + labels: + app.kubernetes.io/name: overwritten + foo: bar +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: trino-coordinator-default + namespace: default +spec: + template: + metadata: + labels: + foo: bar + spec: + containers: + - name: trino + image: custom-image +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: trino-coordinator-default + namespace: default +spec: + replicas: 3 +", + ) + .unwrap(); + + let get_replicas = |sts: &StatefulSet| sts.spec.as_ref().unwrap().replicas; + let get_trino_container = |sts: &StatefulSet| { + sts.spec + .as_ref() + .unwrap() + .template + .spec + .as_ref() + .unwrap() + .containers + .iter() + .find(|c| c.name == "trino") + .unwrap() + .clone() + }; + let get_trino_container_image = |sts: &StatefulSet| get_trino_container(sts).image; + + assert_eq!(get_replicas(&sts), None); + assert_eq!( + get_trino_container_image(&sts).as_deref(), + Some("trino-image") + ); + apply_patches(&mut sts, patches.iter()).unwrap(); + assert_eq!(get_replicas(&sts), Some(3)); + assert_eq!( + get_trino_container_image(&sts).as_deref(), + Some("custom-image") + ); + } + + #[test] + fn configmap_patched() { + let mut cm: ConfigMap = serde_yaml::from_str( + " +apiVersion: v1 +kind: ConfigMap +metadata: + name: game-demo +data: + foo: bar + config.properties: |- + coordinator=true + http-server.https.enabled=true + log.properties: |- + =info +", + ) + .unwrap(); + let patches = parse_patches( + " +apiVersion: v1 +kind: ConfigMap +metadata: + name: game-demo +data: + foo: overwritten + log.properties: |- + =info,tech.stackable=debug +", + ) + .unwrap(); + + assert_eq!( + cm.data.as_ref().unwrap(), + &BTreeMap::from([ + ("foo".to_owned(), "bar".to_owned()), + ( + "config.properties".to_owned(), + "coordinator=true\nhttp-server.https.enabled=true".to_owned() + ), + ("log.properties".to_owned(), "=info".to_owned()), + ]) + ); + apply_patches(&mut cm, patches.iter()).unwrap(); + assert_eq!( + cm.data.as_ref().unwrap(), + &BTreeMap::from([ + ("foo".to_owned(), "overwritten".to_owned()), + ( + "config.properties".to_owned(), + "coordinator=true\nhttp-server.https.enabled=true".to_owned() + ), + ( + "log.properties".to_owned(), + "=info,tech.stackable=debug".to_owned() + ), + ]) + ); + } + + #[test] + fn secret_patched() { + let mut secret: Secret = serde_yaml::from_str( + " +apiVersion: v1 +kind: Secret +metadata: + name: dotfile-secret +stringData: + foo: bar +data: + raw: YmFyCg== # echo bar | base64 +", + ) + .unwrap(); + let patches = parse_patches( + " +apiVersion: v1 +kind: Secret +metadata: + name: dotfile-secret +stringData: + foo: overwritten +data: + raw: b3ZlcndyaXR0ZW4K # echo overwritten | base64 +", + ) + .unwrap(); + + assert_eq!( + secret.string_data.as_ref().unwrap(), + &BTreeMap::from([("foo".to_owned(), "bar".to_owned())]) + ); + assert_eq!( + secret.data.as_ref().unwrap(), + &BTreeMap::from([("raw".to_owned(), ByteString(b"bar\n".to_vec()))]) + ); + + apply_patches(&mut secret, patches.iter()).unwrap(); + assert_eq!( + secret.string_data.as_ref().unwrap(), + &BTreeMap::from([("foo".to_owned(), "overwritten".to_owned()),]) + ); + assert_eq!( + secret.data.as_ref().unwrap(), + &BTreeMap::from([("raw".to_owned(), ByteString(b"overwritten\n".to_vec()))]) + ); + } + + #[test] + fn cluster_scoped_object_patched() { + let mut storage_class: StorageClass = serde_yaml::from_str( + " +apiVersion: storage.k8s.io/v1 +kind: StorageClass +metadata: + name: low-latency + labels: + foo: original + annotations: + storageclass.kubernetes.io/is-default-class: \"false\" +provisioner: csi-driver.example-vendor.example +", + ) + .unwrap(); + let patches = parse_patches( + " +--- +apiVersion: v1 +kind: ServiceAccount +--- +apiVersion: storage.k8s.io/v1 +kind: StorageClass +metadata: + name: low-latency + labels: + foo: overwritten + annotations: + new: annotation +provisioner: custom-provisioner +--- +foo: bar +", + ) + .unwrap(); + + assert_has_label(&storage_class, "foo", "original"); + apply_patches(&mut storage_class, patches.iter()).unwrap(); + assert_has_label(&storage_class, "foo", "overwritten"); + } + + fn assert_has_label>( + object: &O, + key: impl AsRef, + value: impl AsRef, + ) { + assert_eq!( + object + .metadata() + .labels + .as_ref() + .expect("labels missing") + .get(key.as_ref()) + .expect("key missing from labels"), + value.as_ref() + ); + } +} From 38a09962d74f0f446510fc1dcaa8c0ec03726d96 Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Tue, 11 Nov 2025 10:48:40 +0100 Subject: [PATCH 2/6] refactor: Switch to a lis of objects (as opposed to a big string field) --- .../src/cluster_resources.rs | 21 +- crates/stackable-operator/src/lib.rs | 1 + .../stackable-operator/src/patchinator/crd.rs | 13 + .../src/patchinator/mod.rs | 312 +++++++++--------- crates/stackable-shared/src/lib.rs | 1 - 5 files changed, 175 insertions(+), 173 deletions(-) create mode 100644 crates/stackable-operator/src/patchinator/crd.rs rename crates/{stackable-shared => stackable-operator}/src/patchinator/mod.rs (68%) diff --git a/crates/stackable-operator/src/cluster_resources.rs b/crates/stackable-operator/src/cluster_resources.rs index e33fd5ac5..f02fdf2d2 100644 --- a/crates/stackable-operator/src/cluster_resources.rs +++ b/crates/stackable-operator/src/cluster_resources.rs @@ -22,10 +22,9 @@ use k8s_openapi::{ }, apimachinery::pkg::apis::meta::v1::{LabelSelector, LabelSelectorRequirement}, }; -use kube::{Resource, ResourceExt, api::DynamicObject, core::ErrorResponse}; +use kube::{Resource, ResourceExt, core::ErrorResponse}; use serde::{Serialize, de::DeserializeOwned}; use snafu::{OptionExt, ResultExt, Snafu}; -use stackable_shared::patchinator::{self, apply_patches, parse_patches}; use strum::Display; use tracing::{debug, info, warn}; @@ -43,6 +42,7 @@ use crate::{ Label, LabelError, Labels, consts::{K8S_APP_INSTANCE_KEY, K8S_APP_MANAGED_BY_KEY, K8S_APP_NAME_KEY}, }, + patchinator::{self, ObjectOverrides, apply_patches}, utils::format_full_controller_name, }; @@ -422,7 +422,7 @@ impl ClusterResource for Deployment { /// } /// ``` #[derive(Debug)] -pub struct ClusterResources { +pub struct ClusterResources<'a> { /// The namespace of the cluster namespace: String, @@ -452,10 +452,10 @@ pub struct ClusterResources { apply_strategy: ClusterResourceApplyStrategy, /// Arbitrary Kubernetes object overrides specified by the user via the CRD. - object_overrides: Vec, + object_overrides: &'a ObjectOverrides, } -impl ClusterResources { +impl<'a> ClusterResources<'a> { /// Constructs new `ClusterResources`. /// /// # Arguments @@ -481,7 +481,7 @@ impl ClusterResources { controller_name: &str, cluster: &ObjectReference, apply_strategy: ClusterResourceApplyStrategy, - object_overrides: Option>, + object_overrides: &'a ObjectOverrides, ) -> Result { let namespace = cluster .namespace @@ -495,12 +495,6 @@ impl ClusterResources { .uid .clone() .context(MissingObjectKeySnafu { key: "uid" })?; - let object_overrides = match object_overrides { - Some(object_overrides) => { - parse_patches(object_overrides).context(ParseObjectOverridesSnafu)? - } - None => vec![], - }; Ok(ClusterResources { namespace, @@ -585,8 +579,7 @@ impl ClusterResources { let mut mutated = resource.maybe_mutate(&self.apply_strategy); // We apply the object overrides of the user at the very last to offer maximum flexibility. - apply_patches(&mut mutated, self.object_overrides.iter()) - .context(ApplyObjectOverridesSnafu)?; + apply_patches(&mut mutated, self.object_overrides).context(ApplyObjectOverridesSnafu)?; let patched_resource = self .apply_strategy diff --git a/crates/stackable-operator/src/lib.rs b/crates/stackable-operator/src/lib.rs index 5e08ddcab..49295c6eb 100644 --- a/crates/stackable-operator/src/lib.rs +++ b/crates/stackable-operator/src/lib.rs @@ -22,6 +22,7 @@ pub mod kvp; pub mod logging; pub mod memory; pub mod namespace; +pub mod patchinator; pub mod pod_utils; pub mod product_config_utils; pub mod product_logging; diff --git a/crates/stackable-operator/src/patchinator/crd.rs b/crates/stackable-operator/src/patchinator/crd.rs new file mode 100644 index 000000000..47b3c9f1d --- /dev/null +++ b/crates/stackable-operator/src/patchinator/crd.rs @@ -0,0 +1,13 @@ +use kube::api::DynamicObject; +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; + +use crate::utils::crds::raw_object_list_schema; + +#[derive(Clone, Debug, Deserialize, JsonSchema, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct ObjectOverrides { + #[serde(default)] + #[schemars(schema_with = "raw_object_list_schema")] + pub object_overrides: Vec, +} diff --git a/crates/stackable-shared/src/patchinator/mod.rs b/crates/stackable-operator/src/patchinator/mod.rs similarity index 68% rename from crates/stackable-shared/src/patchinator/mod.rs rename to crates/stackable-operator/src/patchinator/mod.rs index 6beb58a46..04e7cda81 100644 --- a/crates/stackable-shared/src/patchinator/mod.rs +++ b/crates/stackable-operator/src/patchinator/mod.rs @@ -1,13 +1,13 @@ use k8s_openapi::DeepMerge; use kube::core::DynamicObject; -use serde::{Deserialize, de::DeserializeOwned}; +use serde::de::DeserializeOwned; use snafu::{ResultExt, Snafu}; +mod crd; +pub use crd::ObjectOverrides; + #[derive(Debug, Snafu)] pub enum Error { - #[snafu(display("failed to deserialize dynamic object"))] - DeserializeDynamicObject { source: serde_yaml::Error }, - #[snafu(display( "failed to parse dynamic object as apiVersion {target_api_version:?} and kind {target_kind:?}" ))] @@ -18,20 +18,11 @@ pub enum Error { }, } -pub fn parse_patches(patch: impl AsRef) -> Result, Error> { - serde_yaml::Deserializer::from_str(patch.as_ref()) - .map(|manifest| DynamicObject::deserialize(manifest).context(DeserializeDynamicObjectSnafu)) - .collect() -} - -pub fn apply_patches<'a, R>( - base: &mut R, - patches: impl Iterator, -) -> Result<(), Error> +pub fn apply_patches(base: &mut R, patches: &ObjectOverrides) -> Result<(), Error> where R: kube::Resource + DeepMerge + DeserializeOwned, { - for patch in patches { + for patch in &patches.object_overrides { apply_patch(base, patch)?; } Ok(()) @@ -100,6 +91,7 @@ mod tests { use super::*; + /// Using [`serde_yaml`] to generate the test data fn generate_service_account() -> ServiceAccount { serde_yaml::from_str( " @@ -123,6 +115,7 @@ metadata: .unwrap() } + /// Generate the test data programmatically (as operators would normally do) fn generate_stateful_set() -> StatefulSet { StatefulSet { metadata: generate_metadata("trino-coordinator-default"), @@ -178,131 +171,133 @@ metadata: #[test] fn service_account_patched() { let mut sa = generate_service_account(); - let patches = parse_patches( + let object_overrides: ObjectOverrides = serde_yaml::from_str( " -apiVersion: v1 -kind: ServiceAccount -metadata: - name: trino-serviceaccount - namespace: default - labels: - app.kubernetes.io/name: overwritten - foo: bar +objectOverrides: + - apiVersion: v1 + kind: ServiceAccount + metadata: + name: trino-serviceaccount + namespace: default + labels: + app.kubernetes.io/name: overwritten + foo: bar ", ) - .unwrap(); + .expect("test input is valid YAML"); assert_has_label(&sa, "app.kubernetes.io/name", "trino"); - apply_patches(&mut sa, patches.iter()).unwrap(); + apply_patches(&mut sa, &object_overrides).unwrap(); assert_has_label(&sa, "app.kubernetes.io/name", "overwritten"); } #[test] fn service_account_not_patched_as_different_name() { let mut sa = generate_service_account(); - let patches = parse_patches( + let object_overrides: ObjectOverrides = serde_yaml::from_str( " -apiVersion: v1 -kind: ServiceAccount -metadata: - name: other-sa - namespace: default - labels: - app.kubernetes.io/name: overwritten - foo: bar +objectOverrides: + - apiVersion: v1 + kind: ServiceAccount + metadata: + name: other-sa + namespace: default + labels: + app.kubernetes.io/name: overwritten + foo: bar ", ) - .unwrap(); + .expect("test input is valid YAML"); let original = sa.clone(); - apply_patches(&mut sa, patches.iter()).unwrap(); + apply_patches(&mut sa, &object_overrides).unwrap(); assert_eq!(sa, original, "The patch shouldn't have changed anything"); } #[test] fn service_account_not_patched_as_different_namespace() { let mut sa = generate_service_account(); - let patches = parse_patches( + let object_overrides: ObjectOverrides = serde_yaml::from_str( " -apiVersion: v1 -kind: ServiceAccount -metadata: - name: trino-serviceaccount - namespace: other-namespace - labels: - app.kubernetes.io/name: overwritten - foo: bar +objectOverrides: + - apiVersion: v1 + kind: ServiceAccount + metadata: + name: trino-serviceaccount + namespace: other-namespace + labels: + app.kubernetes.io/name: overwritten + foo: bar ", ) - .unwrap(); + .expect("test input is valid YAML"); let original = sa.clone(); - apply_patches(&mut sa, patches.iter()).unwrap(); + apply_patches(&mut sa, &object_overrides).unwrap(); assert_eq!(sa, original, "The patch shouldn't have changed anything"); } #[test] fn service_account_not_patched_as_different_api_version() { let mut sa = generate_service_account(); - let patches = parse_patches( + let object_overrides: ObjectOverrides = serde_yaml::from_str( " -apiVersion: v42 -kind: ServiceAccount -metadata: - name: trino-serviceaccount - namespace: default - labels: - app.kubernetes.io/name: overwritten - foo: bar +objectOverrides: + - apiVersion: v42 + kind: ServiceAccount + metadata: + name: trino-serviceaccount + namespace: default + labels: + app.kubernetes.io/name: overwritten + foo: bar ", ) - .unwrap(); + .expect("test input is valid YAML"); let original = sa.clone(); - apply_patches(&mut sa, patches.iter()).unwrap(); + apply_patches(&mut sa, &object_overrides).unwrap(); assert_eq!(sa, original, "The patch shouldn't have changed anything"); } #[test] fn statefulset_patched_multiple_patches() { let mut sts = generate_stateful_set(); - - let patches = parse_patches( + let object_overrides: ObjectOverrides = serde_yaml::from_str( " -apiVersion: v1 -kind: ServiceAccount -metadata: - name: trino-serviceaccount - namespace: default - labels: - app.kubernetes.io/name: overwritten - foo: bar ---- -apiVersion: apps/v1 -kind: StatefulSet -metadata: - name: trino-coordinator-default - namespace: default -spec: - template: +objectOverrides: + - apiVersion: v1 + kind: ServiceAccount metadata: + name: trino-serviceaccount + namespace: default labels: + app.kubernetes.io/name: overwritten foo: bar + - apiVersion: apps/v1 + kind: StatefulSet + metadata: + name: trino-coordinator-default + namespace: default spec: - containers: - - name: trino - image: custom-image ---- -apiVersion: apps/v1 -kind: StatefulSet -metadata: - name: trino-coordinator-default - namespace: default -spec: - replicas: 3 + template: + metadata: + labels: + foo: bar + spec: + containers: + - name: trino + image: custom-image + - apiVersion: apps/v1 + kind: StatefulSet + metadata: + name: trino-coordinator-default + namespace: default + spec: + replicas: 3 ", ) - .unwrap(); + .expect("test input is valid YAML"); let get_replicas = |sts: &StatefulSet| sts.spec.as_ref().unwrap().replicas; let get_trino_container = |sts: &StatefulSet| { @@ -326,7 +321,7 @@ spec: get_trino_container_image(&sts).as_deref(), Some("trino-image") ); - apply_patches(&mut sts, patches.iter()).unwrap(); + apply_patches(&mut sts, &object_overrides).unwrap(); assert_eq!(get_replicas(&sts), Some(3)); assert_eq!( get_trino_container_image(&sts).as_deref(), @@ -338,33 +333,34 @@ spec: fn configmap_patched() { let mut cm: ConfigMap = serde_yaml::from_str( " -apiVersion: v1 -kind: ConfigMap -metadata: - name: game-demo -data: - foo: bar - config.properties: |- - coordinator=true - http-server.https.enabled=true - log.properties: |- - =info + apiVersion: v1 + kind: ConfigMap + metadata: + name: game-demo + data: + foo: bar + config.properties: |- + coordinator=true + http-server.https.enabled=true + log.properties: |- + =info ", ) .unwrap(); - let patches = parse_patches( + let object_overrides: ObjectOverrides = serde_yaml::from_str( " -apiVersion: v1 -kind: ConfigMap -metadata: - name: game-demo -data: - foo: overwritten - log.properties: |- - =info,tech.stackable=debug +objectOverrides: + - apiVersion: v1 + kind: ConfigMap + metadata: + name: game-demo + data: + foo: overwritten + log.properties: |- + =info,tech.stackable=debug ", ) - .unwrap(); + .expect("test input is valid YAML"); assert_eq!( cm.data.as_ref().unwrap(), @@ -377,7 +373,7 @@ data: ("log.properties".to_owned(), "=info".to_owned()), ]) ); - apply_patches(&mut cm, patches.iter()).unwrap(); + apply_patches(&mut cm, &object_overrides).unwrap(); assert_eq!( cm.data.as_ref().unwrap(), &BTreeMap::from([ @@ -398,30 +394,31 @@ data: fn secret_patched() { let mut secret: Secret = serde_yaml::from_str( " -apiVersion: v1 -kind: Secret -metadata: - name: dotfile-secret -stringData: - foo: bar -data: - raw: YmFyCg== # echo bar | base64 + apiVersion: v1 + kind: Secret + metadata: + name: dotfile-secret + stringData: + foo: bar + data: + raw: YmFyCg== # echo bar | base64 ", ) .unwrap(); - let patches = parse_patches( + let object_overrides: ObjectOverrides = serde_yaml::from_str( " -apiVersion: v1 -kind: Secret -metadata: - name: dotfile-secret -stringData: - foo: overwritten -data: - raw: b3ZlcndyaXR0ZW4K # echo overwritten | base64 +objectOverrides: + - apiVersion: v1 + kind: Secret + metadata: + name: dotfile-secret + stringData: + foo: overwritten + data: + raw: b3ZlcndyaXR0ZW4K # echo overwritten | base64 ", ) - .unwrap(); + .expect("test input is valid YAML"); assert_eq!( secret.string_data.as_ref().unwrap(), @@ -432,7 +429,7 @@ data: &BTreeMap::from([("raw".to_owned(), ByteString(b"bar\n".to_vec()))]) ); - apply_patches(&mut secret, patches.iter()).unwrap(); + apply_patches(&mut secret, &object_overrides).unwrap(); assert_eq!( secret.string_data.as_ref().unwrap(), &BTreeMap::from([("foo".to_owned(), "overwritten".to_owned()),]) @@ -447,41 +444,40 @@ data: fn cluster_scoped_object_patched() { let mut storage_class: StorageClass = serde_yaml::from_str( " -apiVersion: storage.k8s.io/v1 -kind: StorageClass -metadata: - name: low-latency - labels: - foo: original - annotations: - storageclass.kubernetes.io/is-default-class: \"false\" -provisioner: csi-driver.example-vendor.example + apiVersion: storage.k8s.io/v1 + kind: StorageClass + metadata: + name: low-latency + labels: + foo: original + annotations: + storageclass.kubernetes.io/is-default-class: \"false\" + provisioner: csi-driver.example-vendor.example ", ) .unwrap(); - let patches = parse_patches( + let object_overrides: ObjectOverrides = serde_yaml::from_str( " ---- -apiVersion: v1 -kind: ServiceAccount ---- -apiVersion: storage.k8s.io/v1 -kind: StorageClass -metadata: - name: low-latency - labels: - foo: overwritten - annotations: - new: annotation -provisioner: custom-provisioner ---- -foo: bar +objectOverrides: + - apiVersion: v1 + kind: ServiceAccount + - apiVersion: storage.k8s.io/v1 + kind: StorageClass + metadata: + name: low-latency + labels: + foo: overwritten + annotations: + new: annotation + provisioner: custom-provisioner + - foo: bar + - {} ", ) - .unwrap(); + .expect("test input is valid YAML"); assert_has_label(&storage_class, "foo", "original"); - apply_patches(&mut storage_class, patches.iter()).unwrap(); + apply_patches(&mut storage_class, &object_overrides).unwrap(); assert_has_label(&storage_class, "foo", "overwritten"); } diff --git a/crates/stackable-shared/src/lib.rs b/crates/stackable-shared/src/lib.rs index bb49166fe..767726d3d 100644 --- a/crates/stackable-shared/src/lib.rs +++ b/crates/stackable-shared/src/lib.rs @@ -2,7 +2,6 @@ //! workspace. pub mod crd; -pub mod patchinator; pub mod secret; pub mod time; pub mod yaml; From d87791f23e10c2d2420536ea42643ca44b7d061f Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Tue, 11 Nov 2025 10:59:41 +0100 Subject: [PATCH 3/6] changelog --- crates/stackable-operator/CHANGELOG.md | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/crates/stackable-operator/CHANGELOG.md b/crates/stackable-operator/CHANGELOG.md index 53e8bbf3d..8b9a612fe 100644 --- a/crates/stackable-operator/CHANGELOG.md +++ b/crates/stackable-operator/CHANGELOG.md @@ -4,6 +4,21 @@ All notable changes to this project will be documented in this file. ## [Unreleased] +### Added + +- Support `objectOverrides` ([#1118]). + +### Changed + +- BREAKING: `ClusterResources` now requires the objects added to implement `DeepMerge`. + This is very likely a stackable-operator internal change, but technically breaking ([#1118]). + +### Removed + +- BREAKING: `ClusterResources` no longer derives `Eq` and `PartialEq` ([#1118]). + +[#1118]: https://github.com/stackabletech/operator-rs/pull/1118 + ## [0.100.3] - 2025-10-31 ### Changed From aaf74c27f532f29360b63af36218ce55f0ce811f Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Tue, 11 Nov 2025 11:04:27 +0100 Subject: [PATCH 4/6] Add TODO for docs --- crates/stackable-operator/src/patchinator/crd.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/stackable-operator/src/patchinator/crd.rs b/crates/stackable-operator/src/patchinator/crd.rs index 47b3c9f1d..3f2e604c5 100644 --- a/crates/stackable-operator/src/patchinator/crd.rs +++ b/crates/stackable-operator/src/patchinator/crd.rs @@ -7,6 +7,7 @@ use crate::utils::crds::raw_object_list_schema; #[derive(Clone, Debug, Deserialize, JsonSchema, Serialize)] #[serde(rename_all = "camelCase")] pub struct ObjectOverrides { + /// TODO docs #[serde(default)] #[schemars(schema_with = "raw_object_list_schema")] pub object_overrides: Vec, From 22b173273f50d800aaa2593d7bfa3d4d186e78d6 Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Tue, 11 Nov 2025 12:29:52 +0100 Subject: [PATCH 5/6] Add a test for Listener merging --- .../crd/listener/listeners/v1alpha1_impl.rs | 108 +++++++++++++----- 1 file changed, 81 insertions(+), 27 deletions(-) diff --git a/crates/stackable-operator/src/crd/listener/listeners/v1alpha1_impl.rs b/crates/stackable-operator/src/crd/listener/listeners/v1alpha1_impl.rs index cf03f0522..4c2c5b602 100644 --- a/crates/stackable-operator/src/crd/listener/listeners/v1alpha1_impl.rs +++ b/crates/stackable-operator/src/crd/listener/listeners/v1alpha1_impl.rs @@ -1,3 +1,5 @@ +use k8s_openapi::{DeepMerge, merge_strategies}; + use crate::crd::listener::listeners::v1alpha1::{ Listener, ListenerIngress, ListenerPort, ListenerSpec, ListenerStatus, }; @@ -8,87 +10,139 @@ impl ListenerSpec { } } -impl k8s_openapi::DeepMerge for Listener { +impl DeepMerge for Listener { fn merge_from(&mut self, other: Self) { - k8s_openapi::DeepMerge::merge_from(&mut self.metadata, other.metadata); - k8s_openapi::DeepMerge::merge_from(&mut self.spec, other.spec); - k8s_openapi::DeepMerge::merge_from(&mut self.status, other.status); + DeepMerge::merge_from(&mut self.metadata, other.metadata); + DeepMerge::merge_from(&mut self.spec, other.spec); + DeepMerge::merge_from(&mut self.status, other.status); } } -impl k8s_openapi::DeepMerge for ListenerSpec { +impl DeepMerge for ListenerSpec { fn merge_from(&mut self, other: Self) { - k8s_openapi::DeepMerge::merge_from(&mut self.class_name, other.class_name); - k8s_openapi::merge_strategies::map::granular( + DeepMerge::merge_from(&mut self.class_name, other.class_name); + merge_strategies::map::granular( &mut self.extra_pod_selector_labels, other.extra_pod_selector_labels, |current_item, other_item| { - k8s_openapi::DeepMerge::merge_from(current_item, other_item); + DeepMerge::merge_from(current_item, other_item); }, ); - k8s_openapi::merge_strategies::list::map( + merge_strategies::list::map( &mut self.ports, other.ports, &[|lhs, rhs| lhs.name == rhs.name], |current_item, other_item| { - k8s_openapi::DeepMerge::merge_from(current_item, other_item); + DeepMerge::merge_from(current_item, other_item); }, ); - k8s_openapi::DeepMerge::merge_from( + DeepMerge::merge_from( &mut self.publish_not_ready_addresses, other.publish_not_ready_addresses, ); - todo!() } } -impl k8s_openapi::DeepMerge for ListenerStatus { +impl DeepMerge for ListenerStatus { fn merge_from(&mut self, other: Self) { - k8s_openapi::DeepMerge::merge_from(&mut self.service_name, other.service_name); - k8s_openapi::merge_strategies::list::map( + DeepMerge::merge_from(&mut self.service_name, other.service_name); + merge_strategies::list::map( &mut self.ingress_addresses, other.ingress_addresses, &[|lhs, rhs| lhs.address == rhs.address], |current_item, other_item| { - k8s_openapi::DeepMerge::merge_from(current_item, other_item); + DeepMerge::merge_from(current_item, other_item); }, ); - k8s_openapi::merge_strategies::map::granular( + merge_strategies::map::granular( &mut self.node_ports, other.node_ports, |current_item, other_item| { - k8s_openapi::DeepMerge::merge_from(current_item, other_item); + DeepMerge::merge_from(current_item, other_item); }, ); } } -impl k8s_openapi::DeepMerge for ListenerIngress { +impl DeepMerge for ListenerIngress { fn merge_from(&mut self, other: Self) { - k8s_openapi::DeepMerge::merge_from(&mut self.address, other.address); + DeepMerge::merge_from(&mut self.address, other.address); self.address_type = other.address_type; - k8s_openapi::merge_strategies::map::granular( + merge_strategies::map::granular( &mut self.ports, other.ports, |current_item, other_item| { - k8s_openapi::DeepMerge::merge_from(current_item, other_item); + DeepMerge::merge_from(current_item, other_item); }, ); } } -impl k8s_openapi::DeepMerge for ListenerPort { +impl DeepMerge for ListenerPort { fn merge_from(&mut self, other: Self) { - k8s_openapi::DeepMerge::merge_from(&mut self.name, other.name); - k8s_openapi::DeepMerge::merge_from(&mut self.port, other.port); - k8s_openapi::DeepMerge::merge_from(&mut self.protocol, other.protocol); + DeepMerge::merge_from(&mut self.name, other.name); + DeepMerge::merge_from(&mut self.port, other.port); + DeepMerge::merge_from(&mut self.protocol, other.protocol); } } #[cfg(test)] mod tests { + use super::*; + #[test] fn deep_merge_listener() { - todo!("Add some basic tests for merging"); + let mut base: ListenerSpec = serde_yaml::from_str( + " +className: my-listener-class +extraPodSelectorLabels: + foo: bar +ports: + - name: http + port: 8080 + protocol: http + - name: https + port: 8080 + protocol: https +# publishNotReadyAddresses defaults to true +", + ) + .unwrap(); + + let patch: ListenerSpec = serde_yaml::from_str( + " +className: custom-listener-class +extraPodSelectorLabels: + foo: overridden + extra: label +ports: + - name: https + port: 8443 +publishNotReadyAddresses: false +", + ) + .unwrap(); + + base.merge_from(patch); + + let expected: ListenerSpec = serde_yaml::from_str( + " +className: custom-listener-class +extraPodSelectorLabels: + foo: overridden + extra: label +ports: + - name: http + port: 8080 + protocol: http + - name: https + port: 8443 # overridden + protocol: https +publishNotReadyAddresses: false +", + ) + .unwrap(); + + assert_eq!(base, expected); } } From ec5b8820af558e002a93601f18f7d92807a465f0 Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Tue, 11 Nov 2025 13:10:18 +0100 Subject: [PATCH 6/6] Fix doctests --- crates/stackable-operator/src/cluster_resources.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/crates/stackable-operator/src/cluster_resources.rs b/crates/stackable-operator/src/cluster_resources.rs index f02fdf2d2..087965cdd 100644 --- a/crates/stackable-operator/src/cluster_resources.rs +++ b/crates/stackable-operator/src/cluster_resources.rs @@ -340,6 +340,7 @@ impl ClusterResource for Deployment { /// use serde::{Deserialize, Serialize}; /// use stackable_operator::client::Client; /// use stackable_operator::cluster_resources::{self, ClusterResourceApplyStrategy, ClusterResources}; +/// use stackable_operator::patchinator::ObjectOverrides; /// use stackable_operator::product_config_utils::ValidatedRoleConfigByPropertyKind; /// use stackable_operator::role_utils::Role; /// use std::sync::Arc; @@ -356,7 +357,10 @@ impl ClusterResource for Deployment { /// plural = "AppClusters", /// namespaced, /// )] -/// struct AppClusterSpec {} +/// struct AppClusterSpec { +/// #[serde(flatten)] +/// pub object_overrides: ObjectOverrides, +/// } /// /// enum Error { /// CreateClusterResources { @@ -379,6 +383,7 @@ impl ClusterResource for Deployment { /// CONTROLLER_NAME, /// &app.object_ref(&()), /// ClusterResourceApplyStrategy::Default, +/// &app.spec.object_overrides, /// ) /// .map_err(|source| Error::CreateClusterResources { source })?; ///