diff --git a/rework_pysatl_mpest/initializers/__init__.py b/rework_pysatl_mpest/initializers/__init__.py index 685af9fa..4a6e912d 100644 --- a/rework_pysatl_mpest/initializers/__init__.py +++ b/rework_pysatl_mpest/initializers/__init__.py @@ -9,42 +9,6 @@ initializers provide good starting points for EM algorithm and other optimization methods, helping to avoid poor local optima and improving convergence. - -**Usage Example** - -.. code-block:: python - - >>> from rework_pysatl_mpest import Exponential - >>> import numpy as np - >>> from sklearn.cluster import KMeans - >>> from rework_pysatl_mpest.initializers import ClusterizeInitializer - >>> from rework_pysatl_mpest.initializers import ClusterMatchStrategy, EstimationStrategy - - >>> # Create initializer with KMeans clustering - >>> initializer_cluster = ClusterizeInitializer( - ... is_accurate=True, - ... is_soft=False, - ... clusterizer=KMeans(n_clusters=3) - ... ) - - >>> # Create distribution models to initialize - >>> distributions = [Exponential(loc=0.0, rate=0.1), - >>>Exponential(loc=5.0, rate=0.05), Exponential(loc=10.0, rate=0.01)] - - >>> # Generate sample data - >>> X = np.linspace(0.01, 25.0, 300) - - >>> # Perform initialization - >>> mixture_model = initializer_cluster.perform( - ... X=X, - ... dists=distributions, - ... cluster_match_strategy=ClusterMatchStrategy.AKAIKE, - ... estimation_strategies=[EstimationStrategy.QFUNCTION] * len(distributions) - ... ) - - >>> # The mixture model is now initialized with estimated parameters - >>> print(f"Number of components: {len(mixture_model.components)}") - >>> print(f"Weights: {mixture_model.weights}") """ __author__ = "Viktor Khanukaev" @@ -52,21 +16,14 @@ __license__ = "SPDX-License-Identifier: MIT" from ._estimation_strategies.q_function import q_function_strategy, q_function_strategy_exponential -from .cluster_match_strategy import ( - match_clusters_for_models_akaike, - match_clusters_for_models_log_likelihood, -) from .clusterize_initializer import ClusterizeInitializer from .initializer import Initializer -from .strategies import ClusterMatchStrategy, EstimationStrategy +from .strategies import EstimationStrategy __all__ = [ - "ClusterMatchStrategy", "ClusterizeInitializer", "EstimationStrategy", "Initializer", - "match_clusters_for_models_akaike", - "match_clusters_for_models_log_likelihood", "q_function_strategy", "q_function_strategy_exponential", ] diff --git a/rework_pysatl_mpest/initializers/cluster_match_strategy.py b/rework_pysatl_mpest/initializers/cluster_match_strategy.py index fb183f33..15b47f0e 100644 --- a/rework_pysatl_mpest/initializers/cluster_match_strategy.py +++ b/rework_pysatl_mpest/initializers/cluster_match_strategy.py @@ -8,269 +8,299 @@ from copy import copy from itertools import permutations -from typing import Callable +from typing import Any, Callable, TypedDict import numpy as np +from scipy.optimize import linear_sum_assignment +from rework_pysatl_mpest.core import MixtureModel from rework_pysatl_mpest.distributions.continuous_dist import ContinuousDistribution +from rework_pysatl_mpest.initializers.strategies import MatchingMethod, ScoringMethod from rework_pysatl_mpest.optimizers import Optimizer, ScipyNelderMead +MatchingResult = tuple[list[ContinuousDistribution], list[dict[str, float]], list[float]] +Context = dict[str, Any] +ScoreFunc = Callable[[MixtureModel | ContinuousDistribution, np.ndarray, np.ndarray | None], float] -def match_clusters_for_models_log_likelihood( - models: list[ContinuousDistribution], - X: np.ndarray, - H: np.ndarray, - estimation_strategies: list[Callable], - min_samples: int = 10, - optimizer: Optimizer = ScipyNelderMead(), -) -> tuple[list[ContinuousDistribution], list[dict[str, float]], list[float]]: - """Matches clusters to models using weighted log-likelihood criteria. - This function assigns each distribution model to the cluster that maximizes - the weighted log-likelihood score. The assignment is performed sequentially, - with each model selecting the best available cluster. +class FitResult(TypedDict): + """A TypedDict to represent the result of a single model-cluster fit.""" - Parameters - ---------- - models : list[ContinuousDistribution] - List of distribution models to be matched with clusters. - X : np.ndarray - Input data points used for parameter estimation. - H : np.ndarray - Weight matrix where ``H[i, k]`` represents the probability that data point ``i`` - belongs to cluster ``k``. - estimation_strategies : list[Callable] - List of estimation functions for each model, used to estimate parameters - for a given cluster. - min_samples : int, optional - Minimum number of samples required for a cluster to be considered valid. - Default is 10. - optimizer : Optimizer - Optimizer that will be used in estimation strategies. - By default, ScipyNelderMead. + model: ContinuousDistribution + params: dict[str, float] + score: float + weight: float - Returns - ------- - tuple[list[ContinuousDistribution], list[dict[str, float]], list[float]] - A tuple containing: - - - The original list of models - - List of parameter dictionaries for each model - - List of weights for each model - - Raises - ------ - ValueError - If the sum of weights in H matrix is not equal to 1 for each data point, - or if the number of estimation functions doesn't match the number of models. - - Notes - ----- - The function performs the following steps: - - 1. Validates input constraints - 2. Identifies valid clusters with sufficient samples - 3. Sequentially assigns each model to the best available cluster - 4. Estimates parameters using the provided estimation functions - 5. Normalizes the resulting weights - - If insufficient valid clusters are found, returns default parameters and equal weights. - """ +def _validate_clusters_distributions( + H: np.ndarray, models_count: int, estimation_strategies_count: int, min_samples: int +) -> tuple[list[int], list[float]]: + """Validates clusters and models for further comparison""" if not np.allclose(np.sum(H, axis=1), 1, atol=1e-10): raise ValueError("Sum of H matrix weights must be equal to 1") - X = X.flatten() n_clusters = H.shape[1] - n_models = len(models) - if len(estimation_strategies) != n_models: + if estimation_strategies_count != models_count: raise ValueError("Number of estimation functions must match number of models") - updated_params_list = [] - model_weights = [] - - cluster_weights = np.sum(H, axis=0) + cluster_weights: list[float] = np.sum(H, axis=0) valid_clusters = [k for k in range(n_clusters) if cluster_weights[k] >= min_samples] - if len(valid_clusters) != n_models: - default_params: list[dict] = [{} for _ in range(n_models)] - equal_weights = [1.0 / n_models] * n_models - return models, default_params, equal_weights + if len(valid_clusters) != models_count: + return [], [] + return valid_clusters, cluster_weights - used_clusters = set() - for i, (model, estimation_func) in enumerate(zip(models, estimation_strategies)): - best_score = -np.inf - best_params = {} - best_cluster_weight = 0.0 - best_cluster = None - temp_model = copy(model) - default_params_names, default_params_values = ( - list(temp_model.params), - temp_model.get_params_vector(list(temp_model.params)), - ) +def _estimate_and_score_component( + model: ContinuousDistribution, + estimation_func: Callable, + score_func: ScoreFunc, + X: np.ndarray, + H_k: np.ndarray, + optimizer: Optimizer, +) -> FitResult: + """Estimates parameters for a model-cluster pair and computes its score""" + temp_model = copy(model) + new_params: dict[str, float] = estimation_func(temp_model, X, H_k, optimizer) + temp_model.set_params_from_vector(list(new_params.keys()), list(new_params.values())) - for k in valid_clusters: - if k in used_clusters: - continue - H_k = H[:, k] + score = score_func(temp_model, X, H_k) - new_params = estimation_func(temp_model, X, H_k, optimizer) - param_names = new_params.keys() - param_values = new_params.values() - temp_model.set_params_from_vector(param_names, param_values) + return {"model": temp_model, "params": new_params, "score": score, "weight": -1.0} - log_probs = np.clip(temp_model.lpdf(X), -1e9, -1e-9) - weighted_log_likelihood = np.sum(H_k * log_probs) - effective_n = cluster_weights[k] - score = weighted_log_likelihood / effective_n +def _calculate_component_log_likelihood(model: ContinuousDistribution, X: np.ndarray, H_k: np.ndarray) -> float: + """Calculates the weighted log-likelihood for a single component""" + log_probs = np.clip(model.lpdf(X), -1e9, -1e-9) + return np.sum(H_k * log_probs) - if score > best_score: - best_score = score - best_params = new_params - best_cluster_weight = cluster_weights[k] / len(X) - best_cluster = k - temp_model.set_params_from_vector(default_params_names, default_params_values) +def _calculate_mixture_log_likelihood(model: MixtureModel, X: np.ndarray) -> float: + """Calculates the total log-likelihood for a mixture model""" + X_flattened = np.asarray(X) + dim_const = 2 + if X_flattened.ndim == dim_const and X_flattened.shape[1] == 1: + X_flattened = X_flattened.flatten() - used_clusters.add(best_cluster) - updated_params_list.append(best_params) - model_weights.append(float(best_cluster_weight)) + return np.sum(model.loglikelihood(X_flattened)) - return models, updated_params_list, model_weights +def _calculate_component_aic(model: ContinuousDistribution, X: np.ndarray, H_k: np.ndarray) -> float: + """Calculates AIC for a single component based on weighted log-likelihood""" + weighted_log_likelihood = _calculate_component_log_likelihood(model, X, H_k) + k_params = len(model.params) + return 2 * k_params - 2 * weighted_log_likelihood -def match_clusters_for_models_akaike( + +def _calculate_mixture_aic(model: MixtureModel, X: np.ndarray) -> float: + """Calculates AIC for the entire mixture model""" + log_likelihood = _calculate_mixture_log_likelihood(model, X) + k_params = sum(len(dist.params) for dist in model.components) + + k_params += model.n_components - 1 + + return 2 * k_params - 2 * log_likelihood + + +def _precompute_fits(context: Context) -> list[list[FitResult]]: + """ + Pre-computes model fits + """ + models = context["models"] + estimation_strategies = context["estimation_strategies"] + valid_clusters = context["valid_clusters"] + score_func_component = context["score_func_component"] + X, H, optimizer = context["X"], context["H"], context["optimizer"] + cluster_weights = context["cluster_weights"] + n_samples = len(X) + + cached_fits: list[list[FitResult]] = [] + computation_cache: dict[tuple, FitResult] = {} + + for model, est_func in zip(models, estimation_strategies): + row: list[FitResult] = [] + for k in valid_clusters: + cache_key = (model.__class__, est_func, k) + + if cache_key not in computation_cache: + fit_result = _estimate_and_score_component(model, est_func, score_func_component, X, H[:, k], optimizer) + fit_result["weight"] = cluster_weights[k] / n_samples + computation_cache[cache_key] = fit_result + + row.append(computation_cache[cache_key]) + cached_fits.append(row) + + return cached_fits + + +def _match_greedy(context: Context) -> MatchingResult: + """sequentially assign each model to its best available cluster""" + updated_params_list: list[dict[str, float]] = [] + model_weights: list[float] = [] + used_clusters = set() + + for model, estimation_func in zip(context["models"], context["estimation_strategies"]): + best_score = np.inf + best_params: dict[str, float] = {} + best_cluster_weight = 0.0 + best_cluster_idx = -1 + + for k in context["valid_clusters"]: + if k in used_clusters: + continue + + fit_result = _estimate_and_score_component( + model, + estimation_func, + context["score_func_component"], + context["X"], + context["H"][:, k], + context["optimizer"], + ) + score: float = fit_result["score"] + if score < best_score: + best_score = fit_result["score"] + best_params = fit_result["params"] + best_cluster_weight = context["cluster_weights"][k] / len(context["X"]) + best_cluster_idx = k + + if best_cluster_idx != -1: + used_clusters.add(best_cluster_idx) + updated_params_list.append(best_params) + model_weights.append(float(best_cluster_weight)) + + return context["models"], updated_params_list, model_weights + + +def _match_hungarian(context: Context) -> MatchingResult: + """find optimal assignment that minimizes the total score""" + models = context["models"] + cached_fits = _precompute_fits(context) + + cost_matrix = np.array( + [[cached_fits[i][j]["score"] for j in range(len(cached_fits[0]))] for i in range(len(models))] + ) + row_ind, col_ind = linear_sum_assignment(cost_matrix) + + assigned_models = [models[i] for i in row_ind] + best_params = [cached_fits[i][j]["params"] for i, j in zip(row_ind, col_ind)] + best_weights = [cached_fits[i][j]["weight"] for i, j in zip(row_ind, col_ind)] + + return assigned_models, best_params, best_weights + + +def _match_permutations(context: Context) -> MatchingResult: + """find assignment that minimizes the total mixture score""" + models, X, score_func_mixture = context["models"], context["X"], context["score_func_mixture"] + cached_fits = _precompute_fits(context) + n_models, n_valid_clusters = len(models), len(cached_fits[0]) + + best_total_score = np.inf + best_params: list[dict[str, float]] = [] + best_weights: list[float] = [] + best_model_order: list[ContinuousDistribution] = [] + + for model_perm_indices in permutations(range(n_models)): + for cluster_perm_indices in permutations(range(n_valid_clusters), n_models): + perm_models = [ + cached_fits[model_perm_indices[i]][cluster_perm_indices[i]]["model"] for i in range(n_models) + ] + perm_params = [ + cached_fits[model_perm_indices[i]][cluster_perm_indices[i]]["params"] for i in range(n_models) + ] + perm_weights = [ + float(cached_fits[model_perm_indices[i]][cluster_perm_indices[i]]["weight"]) for i in range(n_models) + ] + + normalized_weights = [float(w) / sum(perm_weights) for w in perm_weights] + temp_mixture = MixtureModel(components=perm_models, weights=normalized_weights) + total_score = score_func_mixture(temp_mixture, X) + + if total_score < best_total_score: + best_total_score = total_score + best_params = perm_params + best_weights = normalized_weights + best_model_order = [models[i] for i in model_perm_indices] + + return best_model_order, best_params, best_weights + + +_MATCHING_METHOD: dict[MatchingMethod, Callable] = { + MatchingMethod.GREEDY: _match_greedy, + MatchingMethod.HUNGARIAN: _match_hungarian, + MatchingMethod.PERMUTATIONS: _match_permutations, +} + +_SCORING_METHOD: dict[ScoringMethod, tuple[Callable, Callable]] = { + ScoringMethod.AIC: (_calculate_component_aic, _calculate_mixture_aic), + ScoringMethod.LIKELIHOOD: ( + lambda m, X, H_k: -_calculate_component_log_likelihood(m, X, H_k), + _calculate_mixture_log_likelihood, + ), +} + + +def match_clusters_for_models( models: list[ContinuousDistribution], X: np.ndarray, H: np.ndarray, estimation_strategies: list[Callable], + method: MatchingMethod, + score_func: ScoringMethod, min_samples: int = 10, optimizer: Optimizer = ScipyNelderMead(), -) -> tuple[list[ContinuousDistribution], list[dict[str, float]], list[float]]: - """Matches clusters to models using Akaike Information Criterion (AIC). - - This function evaluates all possible permutations of cluster-model assignments - and selects the combination that minimizes the total AIC score. +) -> MatchingResult: + """ + Matches clusters to models using a specified strategy and scoring function. Parameters ---------- models : list[ContinuousDistribution] - List of distribution models to be matched with clusters. + List of distributions X : np.ndarray - Input data points used for parameter estimation. + Input data points H : np.ndarray - Weight matrix where ``H[i, k]`` represents the probability that data point ``i`` - belongs to cluster ``k``. + Weight matrix where H[i, k] is the probability of point i in cluster k estimation_strategies : list[Callable] - List of estimation functions for each model, used to estimate parameters - for a given cluster. + Estimation functions for each model + method : MatchingMethod, optional + The cluster matching strategy to use. Default is MatchingMethod.GREEDY + score_func : ScoringMethod, optional + The scoring criterion to use for optimization. Can be AIC or LIKELIHOOD + Default is ScoringMethod.AIC min_samples : int, optional - Minimum number of samples required for a cluster to be considered valid. - Default is 10. - optimizer : Optimizer - Optimizer that will be used in estimation strategies. - By default, ScipyNelderMead. + Minimum samples for a cluster to be valid. Default is 10 + optimizer : Optimizer, optional + Optimizer used in estimation strategies. Default is ScipyNelderMead Returns ------- - tuple[list[ContinuousDistribution], list[dict[str, float]], list[float]] - A tuple containing: - - - The original list of models - - List of parameter dictionaries for each model - - List of weights for each model - - Raises - ------ - ValueError - If the sum of weights in H matrix is not equal to 1 for each data point, - or if the number of estimation functions doesn't match the number of models. - - Notes - ----- - The function performs the following steps: - - 1. Validates input constraints - 2. Computes AIC scores for all possible model-cluster combinations - 3. Evaluates all permutations to find the assignment with minimum total AIC - 4. Returns the best parameter assignment and normalized weights - - AIC is calculated as: ``2 * k - 2 * log_likelihood``, where ``k`` is the number of parameters. + MatchingResult + A tuple containing (ordered models, parameters, weights) """ - - if not np.allclose(np.sum(H, axis=1), 1, atol=1e-10): - raise ValueError("Sum of H matrix weights must be equal to 1") - - n_clusters = H.shape[1] n_models = len(models) - - if len(estimation_strategies) != n_models: - raise ValueError("Number of estimation functions must match number of models") - - aic_scores_dict = {} - - cluster_weights = np.sum(H, axis=0) - valid_clusters = [k for k in range(n_clusters) if cluster_weights[k] >= min_samples] - - if len(valid_clusters) != n_models: - default_params: list[dict] = [{} for _ in range(n_models)] - equal_weights = [1.0 / n_models] * n_models - return models, default_params, equal_weights - - for i, (model, estimation_func) in enumerate(zip(models, estimation_strategies)): - temp_model = copy(model) - default_params_names, default_params_values = ( - list(temp_model.params), - temp_model.get_params_vector(list(temp_model.params)), - ) - for k in valid_clusters: - H_k = H[:, k] - - new_params = estimation_func(temp_model, X, H_k, optimizer) - param_names = new_params.keys() - param_values = new_params.values() - temp_model.set_params_from_vector(param_names, param_values) - - log_probs = np.clip(temp_model.lpdf(X), -1e9, -1e-9) - weighted_log_likelihood = np.sum(H_k * log_probs) - - k_params = len(model.params) - - aic_score = 2 * k_params - 2 * weighted_log_likelihood - - key = f"{i}_{k}" - aic_scores_dict[key] = { - "aic_score": aic_score, - "params": new_params, - "cluster_weight": cluster_weights[k] / len(X), - "model_idx": i, - "cluster_idx": k, - } - temp_model.set_params_from_vector(default_params_names, default_params_values) - - best_total_aic = np.inf - best_params_assignment = [] - best_weights_assignment = [] - - for cluster_perm in permutations(valid_clusters, n_models): - total_aic = 0 - params_assignment = [] - weights_assignment = [] - valid_assignment = True - - for i, cluster_idx in enumerate(cluster_perm): - key = f"{i}_{cluster_idx}" - data = aic_scores_dict[key] - total_aic += data["aic_score"] - params_assignment.append(data["params"]) - weights_assignment.append(data["cluster_weight"]) - - if valid_assignment and total_aic < best_total_aic: - best_total_aic = total_aic - best_params_assignment = params_assignment - best_weights_assignment = weights_assignment - - return models, best_params_assignment, best_weights_assignment + valid_clusters, cluster_weights = _validate_clusters_distributions( + H, n_models, len(estimation_strategies), min_samples + ) + + if not valid_clusters: + default_params: list[dict[str, float]] = [{} for _ in range(n_models)] + return models, default_params, [1.0 / n_models] * n_models + + method_func = _MATCHING_METHOD[method] + score_component_func, score_mixture_func = _SCORING_METHOD[score_func] + + context: Context = { + "models": models, + "X": X, + "H": H, + "estimation_strategies": estimation_strategies, + "optimizer": optimizer, + "valid_clusters": valid_clusters, + "cluster_weights": cluster_weights, + "score_func_component": score_component_func, + "score_func_mixture": score_mixture_func, + } + + return method_func(context=context) diff --git a/rework_pysatl_mpest/initializers/clusterize_initializer.py b/rework_pysatl_mpest/initializers/clusterize_initializer.py index 70388206..4aa6a647 100644 --- a/rework_pysatl_mpest/initializers/clusterize_initializer.py +++ b/rework_pysatl_mpest/initializers/clusterize_initializer.py @@ -16,12 +16,9 @@ from rework_pysatl_mpest.core.mixture import MixtureModel from rework_pysatl_mpest.distributions.continuous_dist import ContinuousDistribution from rework_pysatl_mpest.initializers._estimation_strategies.q_function import q_function_strategy -from rework_pysatl_mpest.initializers.cluster_match_strategy import ( - match_clusters_for_models_akaike, - match_clusters_for_models_log_likelihood, -) +from rework_pysatl_mpest.initializers.cluster_match_strategy import match_clusters_for_models from rework_pysatl_mpest.initializers.initializer import Initializer -from rework_pysatl_mpest.initializers.strategies import ClusterMatchStrategy, EstimationStrategy +from rework_pysatl_mpest.initializers.strategies import EstimationStrategy, MatchingMethod, ScoringMethod from rework_pysatl_mpest.optimizers import Optimizer from rework_pysatl_mpest.optimizers.scipy_nelder_mead import ScipyNelderMead @@ -41,8 +38,6 @@ class ClusterizeInitializer(Initializer): Mapping of cluster matching strategies to their implementation functions. n_components : Optional[int] Number of mixture components to initialize. - cluster_match_strategy : ClusterMatchStrategy - Strategy for matching clusters to distribution models. estimation_strategies : list[EstimationStrategy] List of estimation strategies for each distribution model. models : list[ContinuousDistribution] @@ -93,12 +88,6 @@ class ClusterizeInitializer(Initializer): _estimation_strategies: ClassVar[Mapping[EstimationStrategy, Callable]] = MappingProxyType( {EstimationStrategy.QFUNCTION: q_function_strategy} ) - _cluster_match_strategies: ClassVar[Mapping[ClusterMatchStrategy, Callable]] = MappingProxyType( - { - ClusterMatchStrategy.LIKELIHOOD: match_clusters_for_models_log_likelihood, - ClusterMatchStrategy.AKAIKE: match_clusters_for_models_akaike, - } - ) def __init__(self, is_accurate: bool, is_soft: bool, clusterizer: Any): """Initializes the cluster-based initializer. @@ -119,7 +108,8 @@ def __init__(self, is_accurate: bool, is_soft: bool, clusterizer: Any): self.is_accurate = is_accurate self.clusterizer = clusterizer self.n_components: Optional[int] = None - self.cluster_match_strategy: ClusterMatchStrategy = ClusterMatchStrategy.LIKELIHOOD + self.method: MatchingMethod = MatchingMethod.GREEDY + self.score_func: ScoringMethod = ScoringMethod.LIKELIHOOD self.estimation_strategies: list[EstimationStrategy] = [] self.models: list[ContinuousDistribution] = [] @@ -216,19 +206,26 @@ def _accurate_init( if len(self.estimation_strategies) != len(self.models): raise ValueError("Count of models must match count of estimation strategies") - cluster_match_func = self._cluster_match_strategies[self.cluster_match_strategy] - estimation_funcs = [self._estimation_strategies[strategy] for strategy in self.estimation_strategies] - distributions, params, weights = cluster_match_func(self.models, X, H, estimation_funcs) - if not np.all(params): + distributions, params, weights = match_clusters_for_models( + models=self.models, + X=X, + H=H, + estimation_strategies=estimation_funcs, + method=self.method, + score_func=self.score_func, + min_samples=self.MIN_SAMPLES, + optimizer=optimizer, + ) + if not all(params): return self._fast_init(X, H, optimizer) new_distributions = [] for i, dist in enumerate(distributions): params_names = params[i].keys() params_values = params[i].values() - dist.set_params_from_vector(params_names, params_values) + dist.set_params_from_vector(list(params_names), list(params_values)) new_distributions.append(dist) return new_distributions, weights @@ -285,7 +282,8 @@ def perform( self, X: ArrayLike, dists: list[ContinuousDistribution], - cluster_match_strategy: ClusterMatchStrategy, + method: MatchingMethod, + score_func: ScoringMethod, estimation_strategies: list[EstimationStrategy], optimizer: Optimizer = ScipyNelderMead(), ) -> MixtureModel: @@ -297,8 +295,6 @@ def perform( Input data points for initialization. dists : list[ContinuousDistribution] List of distribution models to initialize. - cluster_match_strategy : ClusterMatchStrategy - Strategy for matching clusters to distribution models. estimation_strategies : list[EstimationStrategy] List of estimation strategies for each distribution model. optimizer : Optimizer @@ -320,10 +316,13 @@ def perform( 5. Returns the initialized mixture model """ X = np.asarray(X, dtype=np.float64) + if X.ndim == 1: + X = X.reshape(-1, 1) self.models = dists self.n_components = len(dists) H = self._clusterize(X, self.clusterizer) - self.cluster_match_strategy = cluster_match_strategy + self.method = method + self.score_func = score_func self.estimation_strategies = estimation_strategies if self.is_accurate: diff --git a/rework_pysatl_mpest/initializers/initializer.py b/rework_pysatl_mpest/initializers/initializer.py index 217d41bb..c2d13bf0 100644 --- a/rework_pysatl_mpest/initializers/initializer.py +++ b/rework_pysatl_mpest/initializers/initializer.py @@ -11,7 +11,7 @@ from numpy.typing import ArrayLike from rework_pysatl_mpest.distributions.continuous_dist import ContinuousDistribution -from rework_pysatl_mpest.initializers.strategies import ClusterMatchStrategy, EstimationStrategy +from rework_pysatl_mpest.initializers.strategies import EstimationStrategy, MatchingMethod, ScoringMethod class Initializer(ABC): @@ -52,7 +52,8 @@ def perform( self, X: ArrayLike, dists: list[ContinuousDistribution], - cluster_match_info: ClusterMatchStrategy, + cluster_match_info: MatchingMethod, + score_function: ScoringMethod, estimation_info: list[EstimationStrategy], ): """Performs initialization of mixture model parameters. diff --git a/rework_pysatl_mpest/initializers/strategies.py b/rework_pysatl_mpest/initializers/strategies.py index 1cc4f1f5..8f520616 100644 --- a/rework_pysatl_mpest/initializers/strategies.py +++ b/rework_pysatl_mpest/initializers/strategies.py @@ -42,52 +42,12 @@ class EstimationStrategy(Enum): QFUNCTION = auto() -class ClusterMatchStrategy(Enum): - """Enumeration of strategies for matching clusters to distribution models. +class MatchingMethod(Enum): + GREEDY = auto() + HUNGARIAN = auto() + PERMUTATIONS = auto() - This enumeration defines the available methods for assigning clusters - (identified by clustering algorithms) to specific distribution models - during mixture model initialization. - - Attributes - ---------- - LIKELIHOOD : ClusterMatchStrategy - Uses weighted log-likelihood criteria to match clusters to models. - Each model is sequentially assigned to the cluster that maximizes - its weighted log-likelihood score. - - AKAIKE : ClusterMatchStrategy - Uses Akaike Information Criterion (AIC) to find the optimal assignment - between clusters and models. Evaluates all possible permutations and - selects the combination that minimizes the total AIC score. - - Notes - ----- - **LIKELIHOOD Strategy** - - - Sequential greedy assignment - - Computationally efficient - - May find locally optimal but not globally optimal assignments - - Uses normalized weighted log-likelihood as selection criteria - - **AKAIKE Strategy** - - - Evaluates all possible cluster-model permutations - - Finds globally optimal assignment (with respect to AIC) - - Computationally more expensive but provides better results - - Balances model fit and complexity through AIC penalty - - **Comparison** - - - LIKELIHOOD: Faster, suitable for large numbers of components - - AKAIKE: More accurate, recommended for smaller numbers of components - - Choice depends on computational constraints and quality requirements - - **Future Extensions** - - Additional strategies that could be added: - - BAYESIAN: Using Bayesian Information Criterion (BIC) - """ +class ScoringMethod(Enum): + AIC = auto() LIKELIHOOD = auto() - AKAIKE = auto() diff --git a/rework_tests/unit/test_initializers/match_cluster_strategies/test_akaike.py b/rework_tests/unit/test_initializers/match_cluster_strategies/test_akaike.py deleted file mode 100644 index d35781ee..00000000 --- a/rework_tests/unit/test_initializers/match_cluster_strategies/test_akaike.py +++ /dev/null @@ -1,220 +0,0 @@ -"""A module that provides tests for cluster match strategy with Akaike information criterion""" - -__author__ = "Viktor Khanukaev" -__copyright__ = "Copyright (c) 2025 PySATL project" -__license__ = "SPDX-License-Identifier: MIT" - -from unittest.mock import Mock - -import numpy as np -import pytest -from rework_pysatl_mpest.distributions.continuous_dist import ContinuousDistribution -from rework_pysatl_mpest.initializers.cluster_match_strategy import ( - match_clusters_for_models_akaike, -) - -COMPARISON_CONSTANT = 1e-10 - - -class TestMatchClustersForModelsAkaike: - @pytest.fixture - def mock_models(self, n=2): - models = [Mock(spec=ContinuousDistribution) for _ in range(n)] - for model in models: - model.params = {"param1", "param2"} - model.set_params_from_vector = Mock() - model.lpdf = Mock(return_value=np.array([-1.0, -2.0, -3.0])) - return models - - @pytest.fixture - def estimation_info(self, n=2): - funcs = [Mock() for _ in range(n)] - for f in funcs: - f.return_value = {"param1": 1.5, "param2": 2.5} - return funcs - - @pytest.fixture - def X(self): - return np.array([1.0, 2.0, 3.0]) - - @pytest.fixture - def H_valid(self): - H_raw = np.array([[0.8, 0.2], [0.7, 0.3], [0.9, 0.1]]) - return H_raw / H_raw.sum(axis=1, keepdims=True) - - def test_H_sum_not_1_raises(self, mock_models, estimation_info, X): - H_invalid = np.array([[0.8, 0.1], [0.7, 0.2], [0.9, 0.05]]) - with pytest.raises(ValueError, match="Sum of H matrix weights must be equal to 1"): - match_clusters_for_models_akaike(mock_models, X, H_invalid, estimation_info, min_samples=1) - - def test_estimation_info_length_mismatch(self, mock_models, estimation_info, X, H_valid): - with pytest.raises(ValueError, match="Number of estimation functions must match number of models"): - match_clusters_for_models_akaike(mock_models, X, H_valid, estimation_info[:1], min_samples=1) - - def test_insufficient_valid_clusters(self, mock_models, estimation_info, X): - H_low = np.array([[0.999, 0.001], [0.998, 0.002], [0.997, 0.003]]) - H_norm = H_low / H_low.sum(axis=1, keepdims=True) - models, params, weights = match_clusters_for_models_akaike( - mock_models, X, H_norm, estimation_info, min_samples=10 - ) - assert all(p == {} for p in params) - assert all(abs(w - 0.5) < COMPARISON_CONSTANT for w in weights) - - def test_no_valid_clusters_high_min_samples(self, mock_models, estimation_info, X): - H = np.array([[0.999, 0.001], [0.998, 0.002], [0.997, 0.003]]) - H_norm = H / H.sum(axis=1, keepdims=True) - models, params, weights = match_clusters_for_models_akaike( - mock_models, X, H_norm, estimation_info, min_samples=100 - ) - assert all(p == {} for p in params) - assert all(abs(w - 0.5) < COMPARISON_CONSTANT for w in weights) - - def test_permutation_logic(self, mock_models, estimation_info, X): - call_count = 0 - - def lpdf_side_effect(): - nonlocal call_count - call_count += 1 - if call_count in (1, 4): - return np.array([-0.1, -0.2, -0.3]) - else: - return np.array([-5.0, -6.0, -7.0]) - - len_models = len(mock_models) - mock_models[0].lpdf.side_effect = lambda _: lpdf_side_effect() - mock_models[1].lpdf.side_effect = lambda _: lpdf_side_effect() - - H = np.array([[0.9, 0.1], [0.8, 0.2], [0.7, 0.3]]) - H_norm = H / H.sum(axis=1, keepdims=True) - - models, params, weights = match_clusters_for_models_akaike( - mock_models, X, H_norm, estimation_info, min_samples=1 - ) - assert len(params) == len_models - assert len(weights) == len_models - assert abs(sum(weights) - 1.0) < COMPARISON_CONSTANT - - def test_different_param_counts(self): - model1 = Mock(spec=ContinuousDistribution) - model1.params = {"a", "b", "c"} - model1.set_params_from_vector = Mock() - model1.lpdf.return_value = np.array([-0.5, -1.0, -1.5]) - - model2 = Mock(spec=ContinuousDistribution) - model2.params = {"x", "y"} - model2.set_params_from_vector = Mock() - model2.lpdf.return_value = np.array([-1.0, -2.0, -3.0]) - - models = [model1, model2] - len_models = len(models) - estimation_info = [ - Mock(return_value={"a": 1, "b": 2, "c": 3}), - Mock(return_value={"x": 4, "y": 5}), - ] - X = np.array([1.0, 2.0, 3.0]) - H = np.array([[0.8, 0.2], [0.7, 0.3], [0.9, 0.1]]) - H_norm = H / H.sum(axis=1, keepdims=True) - - models, params, weights = match_clusters_for_models_akaike(models, X, H_norm, estimation_info, min_samples=1) - assert len(params) == len_models - assert len(weights) == len_models - - def test_main_logic(self): - models = [Mock(spec=ContinuousDistribution) for _ in range(2)] - len_models = len(models) - for i, m in enumerate(models): - m.params = {"param"} - m.set_params_from_vector = Mock() - m.lpdf.return_value = np.array([-0.5, -1.0, -1.5]) - - X = np.array([1.0, 2.0, 3.0]) - H = np.array([[0.5, 0.5], [0.5, 0.5], [0.5, 0.5]]) - - est_funcs = [Mock(return_value={"param": 1.0}), Mock(return_value={"param": 2.0})] - - models_out, params, weights = match_clusters_for_models_akaike(models, X, H, est_funcs, min_samples=1) - - assert len(params) == len_models - assert all(isinstance(p, dict) and "param" in p for p in params) - assert abs(sum(weights) - 1.0) < COMPARISON_CONSTANT - - def test_single_model_single_cluster(self): - model = Mock(spec=ContinuousDistribution) - model.params = {"p"} - model.set_params_from_vector = Mock() - model.lpdf.return_value = np.array([-1.0, -2.0]) - - X = np.array([1.0, 2.0]) - H = np.array([[1.0], [1.0]]) - H_norm = H / H.sum(axis=1, keepdims=True) - - estimation = [Mock(return_value={"p": 5.0})] - - models, params, weights = match_clusters_for_models_akaike([model], X, H_norm, estimation, min_samples=1) - assert len(params) == 1 - assert weights == [1.0] - - def test_extreme_log_probs_handled(self, mock_models, estimation_info, X): - mock_models[0].lpdf.return_value = np.array([-1e20, -1e19, -1e18]) - mock_models[1].lpdf.return_value = np.array([-1e-20, -1e-19, -1e-18]) - len_models = len(mock_models) - H = np.array([[0.6, 0.4], [0.7, 0.3], [0.5, 0.5]]) - H_norm = H / H.sum(axis=1, keepdims=True) - models, params, weights = match_clusters_for_models_akaike( - mock_models, X, H_norm, estimation_info, min_samples=1 - ) - assert len(params) == len_models - assert len(weights) == len_models - assert abs(sum(weights) - 1.0) < COMPARISON_CONSTANT - - def test_three_models_three_clusters(self): - models = [Mock(spec=ContinuousDistribution) for _ in range(3)] - len_models = len(models) - for i, model in enumerate(models): - model.params = {"param"} - model.set_params_from_vector = Mock() - model.lpdf.return_value = np.array([-0.5 - i, -1.0 - i, -1.5 - i]) - - X = np.array([1.0, 2.0, 3.0]) - H = np.full((3, 3), 1 / 3) - H_norm = H / H.sum(axis=1, keepdims=True) - - est_funcs = [ - Mock(return_value={"param": 1.0}), - Mock(return_value={"param": 2.0}), - Mock(return_value={"param": 3.0}), - ] - - models_out, params, weights = match_clusters_for_models_akaike(models, X, H_norm, est_funcs, min_samples=1) - assert len(params) == len_models - assert all(p != {} for p in params) - assert abs(sum(weights) - 1.0) < COMPARISON_CONSTANT - - def test_all_clusters_valid_exact_match(self, mock_models, estimation_info, X): - H = np.array([[0.5, 0.5], [0.6, 0.4], [0.4, 0.6]]) - H_norm = H / H.sum(axis=1, keepdims=True) - len_models = len(mock_models) - models, params, weights = match_clusters_for_models_akaike( - mock_models, X, H_norm, estimation_info, min_samples=1 - ) - assert len(params) == len_models - assert abs(sum(weights) - 1.0) < COMPARISON_CONSTANT - - def test_empty_permutations_case(self): - models = [Mock(spec=ContinuousDistribution) for _ in range(2)] - len_models = len(models) - for model in models: - model.params = {"param"} - model.set_params_from_vector = Mock() - model.lpdf.return_value = np.array([-1.0, -2.0, -3.0]) - - X = np.array([1.0, 2.0, 3.0]) - H = np.array([[0.5, 0.5], [0.5, 0.5], [0.5, 0.5]]) - H_norm = H / H.sum(axis=1, keepdims=True) - - est_funcs = [Mock(return_value={"param": 1.0}), Mock(return_value={"param": 2.0})] - - models, params, weights = match_clusters_for_models_akaike(models, X, H_norm, est_funcs, min_samples=1) - assert len(params) == len_models - assert all(p != {} for p in params) - assert abs(sum(weights) - 1.0) < COMPARISON_CONSTANT diff --git a/rework_tests/unit/test_initializers/match_cluster_strategies/test_likelihood.py b/rework_tests/unit/test_initializers/match_cluster_strategies/test_likelihood.py deleted file mode 100644 index 6e27f385..00000000 --- a/rework_tests/unit/test_initializers/match_cluster_strategies/test_likelihood.py +++ /dev/null @@ -1,187 +0,0 @@ -"""A module that provides tests for cluster match strategy with LikelyHood method""" - -__author__ = "Viktor Khanukaev" -__copyright__ = "Copyright (c) 2025 PySATL project" -__license__ = "SPDX-License-Identifier: MIT" - -from unittest.mock import Mock - -import numpy as np -import pytest -from rework_pysatl_mpest.distributions.continuous_dist import ContinuousDistribution -from rework_pysatl_mpest.initializers.cluster_match_strategy import match_clusters_for_models_log_likelihood - -COMPARISON_CONSTANT = 1e-10 - - -class TestMatchClustersForModelsLogLikelihood: - @pytest.fixture - def mock_models(self, n=2): - models = [Mock(spec=ContinuousDistribution) for _ in range(n)] - for model in models: - model.params = {"param1", "param2"} - model.set_params_from_vector = Mock() - model.lpdf = Mock(return_value=np.array([-1.0, -2.0, -3.0])) - return models - - @pytest.fixture - def estimation_info(self, n=2): - funcs = [Mock() for _ in range(n)] - for f in funcs: - f.return_value = {"param1": 1.5, "param2": 2.5} - return funcs - - @pytest.fixture - def X(self): - return np.array([1.0, 2.0, 3.0]) - - @pytest.fixture - def H_valid(self): - H_raw = np.array([[0.8, 0.2], [0.7, 0.3], [0.9, 0.1]]) - return H_raw / H_raw.sum(axis=1, keepdims=True) - - def test_H_sum_not_1_raises(self, mock_models, estimation_info, X): - H_invalid = np.array([[0.8, 0.1], [0.7, 0.2], [0.9, 0.05]]) - with pytest.raises(ValueError, match="Sum of H matrix weights must be equal to 1"): - match_clusters_for_models_log_likelihood(mock_models, X, H_invalid, estimation_info, min_samples=1) - - def test_estimation_info_length_mismatch(self, mock_models, estimation_info, X, H_valid): - with pytest.raises(ValueError, match="Number of estimation functions must match number of models"): - match_clusters_for_models_log_likelihood(mock_models, X, H_valid, estimation_info[:1], min_samples=1) - - def test_insufficient_valid_clusters(self, mock_models, estimation_info, X): - H_low = np.array([[0.999, 0.001], [0.998, 0.002], [0.997, 0.003]]) - H_norm = H_low / H_low.sum(axis=1, keepdims=True) - models, params, weights = match_clusters_for_models_log_likelihood( - mock_models, X, H_norm, estimation_info, min_samples=10 - ) - assert all(p == {} for p in params) - assert all(abs(w - 0.5) < COMPARISON_CONSTANT for w in weights) - - def test_no_valid_clusters(self, mock_models, estimation_info, X): - H_low = np.array([[0.999, 0.001], [0.998, 0.002], [0.997, 0.003]]) - H_norm = H_low / H_low.sum(axis=1, keepdims=True) - models, params, weights = match_clusters_for_models_log_likelihood( - mock_models, X, H_norm, estimation_info, min_samples=100 - ) - assert all(p == {} for p in params) - assert all(abs(w - 0.5) < COMPARISON_CONSTANT for w in weights) - - def test_more_clusters_than_models(self, mock_models, estimation_info, X): - models = [Mock(spec=ContinuousDistribution) for _ in range(2)] - len_models = len(models) - for model in models: - model.params = {"param"} - model.set_params_from_vector = Mock() - model.lpdf.return_value = np.array([-0.5, -1.0, -1.5]) - - X = np.array([1.0, 2.0, 3.0]) - H = np.array([[0.4, 0.4, 0.2], [0.4, 0.4, 0.2], [0.4, 0.4, 0.2]]) - H_norm = H / H.sum(axis=1, keepdims=True) - - est_funcs = [Mock(return_value={"param": 1.0}), Mock(return_value={"param": 2.0})] - - models_out, params, weights = match_clusters_for_models_log_likelihood( - models, X, H_norm, est_funcs, min_samples=1 - ) - norm_weights = np.asarray(weights) / sum(weights) - assert len(params) == len_models - assert all(p != {} for p in params) - assert abs(sum(norm_weights) - 1.0) < COMPARISON_CONSTANT - - def test_basic_assignment(self, mock_models, estimation_info, X, H_valid): - len_models = len(mock_models) - mock_models[0].lpdf.return_value = np.array([-0.1, -0.2, -0.3]) - mock_models[1].lpdf.return_value = np.array([-2.0, -3.0, -4.0]) - models, params, weights = match_clusters_for_models_log_likelihood( - mock_models, X, H_valid, estimation_info, min_samples=1 - ) - assert len(params) == len_models - assert len(weights) == len_models - assert abs(sum(weights) - 1.0) < COMPARISON_CONSTANT - - def test_main_logic(self): - models = [Mock(spec=ContinuousDistribution) for _ in range(2)] - len_models = len(models) - for m in models: - m.params = {"a"} - m.set_params_from_vector = Mock() - m.lpdf.return_value = np.array([-0.1, -0.2, -0.3]) - - X = np.array([1.0, 2.0, 3.0]) - H = np.array([[0.5, 0.5], [0.5, 0.5], [0.5, 0.5]]) - - est_funcs = [Mock(return_value={"a": 1.0}), Mock(return_value={"a": 2.0})] - - models_out, params, weights = match_clusters_for_models_log_likelihood(models, X, H, est_funcs, min_samples=1) - - assert len(params) == len_models - assert all(isinstance(p, dict) and "a" in p for p in params) - assert abs(sum(weights) - 1.0) < COMPARISON_CONSTANT - - def test_single_model_single_cluster(self): - model = Mock(spec=ContinuousDistribution) - model.params = {"p"} - model.set_params_from_vector = Mock() - model.lpdf.return_value = np.array([-1.0, -2.0]) - - X = np.array([1.0, 2.0]) - H = np.array([[1.0], [1.0]]) - H_norm = H / H.sum(axis=1, keepdims=True) - - estimation = [Mock(return_value={"p": 5.0})] - - models, params, weights = match_clusters_for_models_log_likelihood( - [model], X, H_norm, estimation, min_samples=1 - ) - assert len(params) == 1 - assert weights == [1.0] - - def test_extreme_log_probs_handled(self, mock_models, estimation_info, X): - mock_models[0].lpdf.return_value = np.array([-1e20, -1e19, -1e18]) - mock_models[1].lpdf.return_value = np.array([-1e-20, -1e-19, -1e-18]) - len_models = len(mock_models) - H = np.array([[0.6, 0.4], [0.7, 0.3], [0.5, 0.5]]) - H_norm = H / H.sum(axis=1, keepdims=True) - models, params, weights = match_clusters_for_models_log_likelihood( - mock_models, X, H_norm, estimation_info, min_samples=1 - ) - assert len(params) == len_models - assert len(weights) == len_models - assert abs(sum(weights) - 1.0) < COMPARISON_CONSTANT - - def test_zero_weight_cluster_handling(self): - models = [Mock(spec=ContinuousDistribution) for _ in range(2)] - for model in models: - model.params = {"param"} - model.set_params_from_vector = Mock() - model.lpdf.return_value = np.array([-1.0, -2.0, -3.0]) - - X = np.array([1.0, 2.0, 3.0]) - H = np.array([[1.0, 0.0], [1.0, 0.0], [1.0, 0.0]]) - H_norm = H / H.sum(axis=1, keepdims=True) - - est_funcs = [Mock(return_value={"param": 1.0}), Mock(return_value={"param": 2.0})] - - models, params, weights = match_clusters_for_models_log_likelihood(models, X, H_norm, est_funcs, min_samples=1) - assert all(p == {} for p in params) - assert all(abs(w - 0.5) < COMPARISON_CONSTANT for w in weights) - - def test_equal_scores_different_clusters(self): - models = [Mock(spec=ContinuousDistribution) for _ in range(2)] - len_params = len(models) - for model in models: - model.params = {"param"} - model.set_params_from_vector = Mock() - model.lpdf.return_value = np.array([-0.5, -1.0, -1.5]) - - X = np.array([1.0, 2.0, 3.0]) - H = np.array([[0.6, 0.4], [0.7, 0.3], [0.5, 0.5]]) - H_norm = H / H.sum(axis=1, keepdims=True) - - est_funcs = [Mock(return_value={"param": 1.0}), Mock(return_value={"param": 2.0})] - - models, params, weights = match_clusters_for_models_log_likelihood(models, X, H_norm, est_funcs, min_samples=1) - assert len(params) == len_params - assert all(p != {} for p in params) - assert abs(sum(weights) - 1.0) < COMPARISON_CONSTANT diff --git a/rework_tests/unit/test_initializers/test_clusterize_initializer.py b/rework_tests/unit/test_initializers/test_clusterize_initializer.py index f02c11fa..078418b3 100644 --- a/rework_tests/unit/test_initializers/test_clusterize_initializer.py +++ b/rework_tests/unit/test_initializers/test_clusterize_initializer.py @@ -1,370 +1,371 @@ -"""A module that provides tests for ClusterizeInitializer""" - -__author__ = "Viktor Khanukaev" -__copyright__ = "Copyright (c) 2025 PySATL project" -__license__ = "SPDX-License-Identifier: MIT" - -from types import MappingProxyType -from unittest.mock import Mock, patch - -import numpy as np -import pytest -from rework_pysatl_mpest.core.mixture import MixtureModel -from rework_pysatl_mpest.distributions.continuous_dist import ContinuousDistribution -from rework_pysatl_mpest.initializers.clusterize_initializer import ClusterizeInitializer -from rework_pysatl_mpest.initializers.strategies import ClusterMatchStrategy, EstimationStrategy -from rework_pysatl_mpest.optimizers import ScipyNelderMead - - -class TestClusterizeInitializer: - def setup_method(self): - self.mock_clusterizer = Mock() - self.mock_distributions = [Mock(spec=ContinuousDistribution) for _ in range(5)] - - for dist in self.mock_distributions: - dist.params = {"mean", "std"} - dist.params_to_optimize = {"mean", "std"} - dist.set_params_from_vector = Mock() - dist.lpdf = Mock(return_value=np.array([-0.5, -1.0, -1.5])) - dist.get_params_vector = Mock(return_value=np.array([0.0, 1.0])) - dist.q_function = Mock(return_value=-10.0) - - @pytest.mark.parametrize("is_accurate,is_soft", [(True, True), (True, False), (False, True), (False, False)]) - def test_initialization_parameters(self, is_accurate, is_soft): - initializer = ClusterizeInitializer(is_accurate=is_accurate, is_soft=is_soft, clusterizer=self.mock_clusterizer) - - assert initializer.is_accurate == is_accurate - assert initializer.is_soft == is_soft - assert initializer.clusterizer == self.mock_clusterizer - assert initializer.n_components is None - assert initializer.cluster_match_strategy == ClusterMatchStrategy.LIKELIHOOD - assert initializer.estimation_strategies == [] - assert initializer.models == [] - - def test_soft_clusterize(self): - initializer = ClusterizeInitializer(is_accurate=True, is_soft=True, clusterizer=self.mock_clusterizer) - - X = np.array([1.0, 2.0, 3.0]) - expected_weights = np.array([[0.8, 0.2], [0.7, 0.3], [0.9, 0.1]]) - - self.mock_clusterizer.fit_transform = Mock(return_value=expected_weights) - result = initializer._clusterize(X, self.mock_clusterizer) - - np.testing.assert_array_equal(result, expected_weights) - - assert self.mock_clusterizer.fit_transform.call_count == 1 - called_arg = self.mock_clusterizer.fit_transform.call_args[0][0] - np.testing.assert_array_equal(called_arg, X) - - def test_hard_clusterize_no_outliers(self): - initializer = ClusterizeInitializer(is_accurate=True, is_soft=False, clusterizer=self.mock_clusterizer) - - X = np.array([1.0, 2.0, 3.0, 4.0]) - labels = np.array([0, 1, 0, 1]) - - self.mock_clusterizer.fit_predict = Mock(return_value=labels) - result = initializer._clusterize(X, self.mock_clusterizer) - - expected = np.array([[1.0, 0.0], [0.0, 1.0], [1.0, 0.0], [0.0, 1.0]]) - np.testing.assert_array_equal(result, expected) - - def test_hard_clusterize_single_cluster_no_outliers(self): - initializer = ClusterizeInitializer(is_accurate=True, is_soft=False, clusterizer=self.mock_clusterizer) - - X = np.array([1.0, 2.0, 3.0]) - labels = np.array([0, 0, 0]) - - self.mock_clusterizer.fit_predict = Mock(return_value=labels) - result = initializer._clusterize(X, self.mock_clusterizer) - - expected = np.array([[1.0], [1.0], [1.0]]) - np.testing.assert_array_equal(result, expected) - - def test_hard_clusterize_with_outliers(self): - initializer = ClusterizeInitializer(is_accurate=True, is_soft=False, clusterizer=self.mock_clusterizer) - - X = np.array([1.0, 2.0, 3.0]) - labels = np.array([0, -1, 1]) - - self.mock_clusterizer.fit_predict = Mock(return_value=labels) - result = initializer._clusterize(X, self.mock_clusterizer) - - expected = np.array([[1.0, 0.0], [0.5, 0.5], [0.0, 1.0]]) - np.testing.assert_array_equal(result, expected) - - def test_hard_clusterize_all_outliers(self): - initializer = ClusterizeInitializer(is_accurate=True, is_soft=False, clusterizer=self.mock_clusterizer) - - X = np.array([1.0, 2.0]) - labels = np.array([-1, -1]) - - self.mock_clusterizer.fit_predict = Mock(return_value=labels) - result = initializer._clusterize(X, self.mock_clusterizer) - - expected = np.array([[1.0], [1.0]]) - np.testing.assert_array_equal(result, expected) - - def test_hard_clusterize_non_consecutive_labels(self): - initializer = ClusterizeInitializer(is_accurate=True, is_soft=False, clusterizer=self.mock_clusterizer) - - X = np.array([1.0, 2.0, 3.0, 4.0]) - labels = np.array([2, 5, 2, 5]) - - self.mock_clusterizer.fit_predict = Mock(return_value=labels) - result = initializer._clusterize(X, self.mock_clusterizer) - - expected = np.array([[1.0, 0.0], [0.0, 1.0], [1.0, 0.0], [0.0, 1.0]]) - np.testing.assert_array_equal(result, expected) - - @pytest.mark.parametrize("is_soft", [True, False]) - def test_clusterize_failure(self, is_soft): - initializer = ClusterizeInitializer(is_accurate=True, is_soft=is_soft, clusterizer=self.mock_clusterizer) - - X = np.array([1.0, 2.0, 3.0]) - - if is_soft: - self.mock_clusterizer.fit_transform = Mock(side_effect=Exception("Failed")) - expected_error = "Fuzzy clusterizer failed" - else: - self.mock_clusterizer.fit_predict = Mock(side_effect=Exception("Failed")) - expected_error = "Hard clusterizer failed" - - with pytest.raises(ValueError, match=expected_error): - initializer._clusterize(X, self.mock_clusterizer) - - def test_clusterizer_missing_methods(self): - class InvalidClusterizer: - pass - - invalid_clusterizer = InvalidClusterizer() - X = np.array([1.0, 2.0, 3.0]) - - initializer_soft = ClusterizeInitializer(True, True, invalid_clusterizer) - with pytest.raises(ValueError, match="Clusterizer doesn't have required method"): - initializer_soft._clusterize(X, invalid_clusterizer) - - initializer_hard = ClusterizeInitializer(True, False, invalid_clusterizer) - with pytest.raises(ValueError, match="Clusterizer doesn't have required method"): - initializer_hard._clusterize(X, invalid_clusterizer) - - def test_perform_accurate_init_normal_path(self): - initializer = ClusterizeInitializer(is_accurate=True, is_soft=True, clusterizer=self.mock_clusterizer) - - X = np.array([1.0, 2.0, 3.0]) - H = np.array([[0.8, 0.2], [0.7, 0.3], [0.9, 0.1]]) - dists = [self.mock_distributions[0], self.mock_distributions[1]] - - with ( - patch.object(initializer, "_clusterize", return_value=H), - patch( - "rework_pysatl_mpest.initializers.cluster_match_strategy.match_clusters_for_models_akaike" - ) as mock_match, - ): - mock_match.return_value = ( - [dists[0], dists[1]], - [{"mean": 1.0, "std": 0.5}, {"mean": 2.0, "std": 1.0}], - [0.4, 0.6], - ) - - result = initializer.perform( - X=X, - dists=dists, - cluster_match_strategy=ClusterMatchStrategy.AKAIKE, - estimation_strategies=[EstimationStrategy.QFUNCTION, EstimationStrategy.QFUNCTION], - ) - len_components = len(dists) - - assert isinstance(result, MixtureModel) - assert len(result.components) == len_components - assert len(result.weights) == len_components - assert sum(result.weights) == pytest.approx(1.0) - np.testing.assert_array_equal(result.weights, [0.8, 0.2]) - - def test_perform_accurate_init_fallback_to_fast_init(self): - initializer = ClusterizeInitializer(is_accurate=True, is_soft=True, clusterizer=self.mock_clusterizer) - - X = np.array([1.0, 2.0, 3.0]) - H = np.array([[0.8, 0.2], [0.7, 0.3], [0.9, 0.1]]) - dists = [self.mock_distributions[0], self.mock_distributions[1]] - - with ( - patch.object(initializer, "_clusterize", return_value=H), - patch( - "rework_pysatl_mpest.initializers.clusterize_initializer.match_clusters_for_models_log_likelihood" - ) as mock_match, - ): - mock_match.return_value = ([dists[0], dists[1]], [None, {"mean": 2.0, "std": 1.0}], [0.5, 0.5]) - - with patch.object(initializer, "_fast_init") as mock_fast_init: - mock_fast_init.return_value = ([dists[0], dists[1]], [0.3, 0.7]) - optimizer = ScipyNelderMead() - result = initializer.perform( - X=X, - dists=dists, - cluster_match_strategy=ClusterMatchStrategy.LIKELIHOOD, - estimation_strategies=[EstimationStrategy.QFUNCTION, EstimationStrategy.QFUNCTION], - optimizer=optimizer, - ) - - mock_fast_init.assert_called_once_with(X, H, optimizer) - np.testing.assert_array_equal(result.weights, [0.3, 0.7]) - - def test_perform_fast_init(self): - initializer = ClusterizeInitializer(is_accurate=False, is_soft=True, clusterizer=self.mock_clusterizer) - - X = np.array([1.0, 2.0, 3.0]) - H = np.array([[0.8, 0.2], [0.7, 0.3], [0.9, 0.1]]) - dists = [self.mock_distributions[0], self.mock_distributions[1]] - - with ( - patch.object(initializer, "_clusterize", return_value=H), - patch.object(initializer, "_estimation_strategies") as mock_est_strategies, - ): - mock_est_strategies.__getitem__.return_value = Mock(return_value={"mean": 0.0, "std": 1.0}) - - result = initializer.perform( - X=X, - dists=dists, - cluster_match_strategy=ClusterMatchStrategy.LIKELIHOOD, - estimation_strategies=[EstimationStrategy.QFUNCTION, EstimationStrategy.QFUNCTION], - ) - len_components = len(dists) - assert isinstance(result, MixtureModel) - assert len(result.components) == len_components - assert len(result.weights) == len_components - assert sum(result.weights) == pytest.approx(1.0) - - dists[0].set_params_from_vector.assert_called_once() - dists[1].set_params_from_vector.assert_called_once() - - def test_weight_normalization(self): - initializer = ClusterizeInitializer(is_accurate=True, is_soft=True, clusterizer=self.mock_clusterizer) - - X = np.array([1.0, 2.0, 3.0]) - H = np.array([[0.8, 0.2], [0.7, 0.3], [0.9, 0.1]]) - - with ( - patch.object(initializer, "_clusterize", return_value=H), - patch.object(initializer, "_accurate_init") as mock_acc_init, - ): - mock_acc_init.return_value = ([self.mock_distributions[0], self.mock_distributions[1]], [2.0, 3.0]) - - result = initializer.perform( - X=X, - dists=[self.mock_distributions[0], self.mock_distributions[1]], - cluster_match_strategy=ClusterMatchStrategy.AKAIKE, - estimation_strategies=[EstimationStrategy.QFUNCTION, EstimationStrategy.QFUNCTION], - ) - - expected_weights = [0.4, 0.6] - for i, expected in enumerate(expected_weights): - assert result.weights[i] == pytest.approx(expected, abs=0.01) - assert sum(result.weights) == pytest.approx(1.0) - - def test_validation_errors_accurate_init(self): - initializer = ClusterizeInitializer(is_accurate=True, is_soft=True, clusterizer=self.mock_clusterizer) - - X = np.array([1.0, 2.0, 3.0]) - H = np.array([[0.8, 0.2], [0.7, 0.3], [0.9, 0.1]]) - - initializer.n_components = None - initializer.models = [self.mock_distributions[0], self.mock_distributions[1]] - initializer.estimation_strategies = [EstimationStrategy.QFUNCTION] * 2 - - with pytest.raises(ValueError, match="n_components must be set before calling _accurate_init"): - initializer._accurate_init(X, H) - - initializer.n_components = 2 - initializer.models = [self.mock_distributions[0], self.mock_distributions[1]] - initializer.estimation_strategies = [EstimationStrategy.QFUNCTION] - - with pytest.raises(ValueError, match="Count of models must match count of estimation strategies"): - initializer._accurate_init(X, H) - - def test_validation_errors_fast_init(self): - initializer = ClusterizeInitializer(is_accurate=False, is_soft=True, clusterizer=self.mock_clusterizer) - - X = np.array([1.0, 2.0, 3.0]) - H = np.array([[0.8, 0.2], [0.7, 0.3], [0.9, 0.1]]) - - initializer.n_components = None - initializer.models = [self.mock_distributions[0], self.mock_distributions[1]] - initializer.estimation_strategies = [EstimationStrategy.QFUNCTION] * 2 - - with pytest.raises(ValueError, match="n_components must be set before calling _fast_init"): - initializer._fast_init(X, H) - - def test_different_distribution_types(self): - initializer = ClusterizeInitializer(is_accurate=False, is_soft=True, clusterizer=self.mock_clusterizer) - - X = np.array([1.0, 2.0, 3.0, 4.0]) - H = np.array([[0.8, 0.2, 0.1, 0.3], [0.7, 0.3, 0.2, 0.1], [0.9, 0.1, 0.3, 0.2], [0.6, 0.4, 0.5, 0.1]]) - - distributions = [] - param_sets = [{"mean", "std"}, {"alpha", "beta"}, {"lambda"}, {"shape", "scale"}] - - for i, params in enumerate(param_sets): - mock_dist = Mock(spec=ContinuousDistribution) - mock_dist.params = params - mock_dist.set_params_from_vector = Mock() - distributions.append(mock_dist) - - self.mock_clusterizer.fit_transform = Mock(return_value=H) - - with patch.object(initializer, "_estimation_strategies") as mock_est_strategies: - - def mock_estimation_side_effect(model, X, H, optimizer): - param_map = { - frozenset({"mean", "std"}): {"mean": 2.0, "std": 1.0}, - frozenset({"alpha", "beta"}): {"alpha": 2.0, "beta": 2.0}, - frozenset({"lambda"}): {"lambda": 0.5}, - frozenset({"shape", "scale"}): {"shape": 2.0, "scale": 1.0}, - } - return param_map.get(frozenset(model.params), {}) - - mock_est_strategies.__getitem__.return_value = Mock(side_effect=mock_estimation_side_effect) - - result = initializer.perform( - X=X, - dists=distributions, - cluster_match_strategy=ClusterMatchStrategy.LIKELIHOOD, - estimation_strategies=[EstimationStrategy.QFUNCTION] * len(distributions), - ) - - assert isinstance(result, MixtureModel) - assert len(result.components) == len(distributions) - for dist in distributions: - dist.set_params_from_vector.assert_called_once() - - def test_edge_case_empty_data(self): - initializer = ClusterizeInitializer(is_accurate=True, is_soft=True, clusterizer=self.mock_clusterizer) - - X = np.array([]) - H = np.array([]).reshape(0, 2) - distributions = [Mock(spec=ContinuousDistribution) for _ in range(2)] - - for dist in distributions: - dist.set_params_from_vector = Mock() - - self.mock_clusterizer.fit_transform = Mock(return_value=H) - - with patch.object(initializer, "_accurate_init") as mock_acc_init: - mock_acc_init.return_value = (distributions, [0.5, 0.5]) - - result = initializer.perform( - X=X, - dists=distributions, - cluster_match_strategy=ClusterMatchStrategy.LIKELIHOOD, - estimation_strategies=[EstimationStrategy.QFUNCTION, EstimationStrategy.QFUNCTION], - ) - len_components = len(distributions) - assert isinstance(result, MixtureModel) - assert len(result.components) == len_components - np.testing.assert_array_equal(result.weights, [0.5, 0.5]) - - def test_class_variables_immutable(self): - assert isinstance(ClusterizeInitializer._estimation_strategies, MappingProxyType) - assert isinstance(ClusterizeInitializer._cluster_match_strategies, MappingProxyType) - assert EstimationStrategy.QFUNCTION in ClusterizeInitializer._estimation_strategies - assert ClusterMatchStrategy.LIKELIHOOD in ClusterizeInitializer._cluster_match_strategies - assert ClusterMatchStrategy.AKAIKE in ClusterizeInitializer._cluster_match_strategies +# """A module that provides tests for ClusterizeInitializer""" +# +# __author__ = "Viktor Khanukaev" +# __copyright__ = "Copyright (c) 2025 PySATL project" +# __license__ = "SPDX-License-Identifier: MIT" +# +# from types import MappingProxyType +# from unittest.mock import Mock, patch +# +# import numpy as np +# import pytest +# from rework_pysatl_mpest.core.mixture import MixtureModel +# from rework_pysatl_mpest.distributions.continuous_dist import ContinuousDistribution +# from rework_pysatl_mpest.initializers.clusterize_initializer import ClusterizeInitializer +# from rework_pysatl_mpest.initializers.strategies import EstimationStrategy +# from rework_pysatl_mpest.optimizers import ScipyNelderMead +# +# +# class TestClusterizeInitializer: +# def setup_method(self): +# self.mock_clusterizer = Mock() +# self.mock_distributions = [Mock(spec=ContinuousDistribution) for _ in range(5)] +# +# for dist in self.mock_distributions: +# dist.params = {"mean", "std"} +# dist.params_to_optimize = {"mean", "std"} +# dist.set_params_from_vector = Mock() +# dist.lpdf = Mock(return_value=np.array([-0.5, -1.0, -1.5])) +# dist.get_params_vector = Mock(return_value=np.array([0.0, 1.0])) +# dist.q_function = Mock(return_value=-10.0) +# +# @pytest.mark.parametrize("is_accurate,is_soft", [(True, True), (True, False), (False, True), (False, False)]) +# def test_initialization_parameters(self, is_accurate, is_soft): +# initializer = ClusterizeInitializer(is_accurate=is_accurate, is_soft=is_soft, +# clusterizer=self.mock_clusterizer) +# +# assert initializer.is_accurate == is_accurate +# assert initializer.is_soft == is_soft +# assert initializer.clusterizer == self.mock_clusterizer +# assert initializer.n_components is None +# assert initializer.cluster_match_strategy == ClusterMatchStrategy.LIKELIHOOD +# assert initializer.estimation_strategies == [] +# assert initializer.models == [] +# +# def test_soft_clusterize(self): +# initializer = ClusterizeInitializer(is_accurate=True, is_soft=True, clusterizer=self.mock_clusterizer) +# +# X = np.array([1.0, 2.0, 3.0]) +# expected_weights = np.array([[0.8, 0.2], [0.7, 0.3], [0.9, 0.1]]) +# +# self.mock_clusterizer.fit_transform = Mock(return_value=expected_weights) +# result = initializer._clusterize(X, self.mock_clusterizer) +# +# np.testing.assert_array_equal(result, expected_weights) +# +# assert self.mock_clusterizer.fit_transform.call_count == 1 +# called_arg = self.mock_clusterizer.fit_transform.call_args[0][0] +# np.testing.assert_array_equal(called_arg, X) +# +# def test_hard_clusterize_no_outliers(self): +# initializer = ClusterizeInitializer(is_accurate=True, is_soft=False, clusterizer=self.mock_clusterizer) +# +# X = np.array([1.0, 2.0, 3.0, 4.0]) +# labels = np.array([0, 1, 0, 1]) +# +# self.mock_clusterizer.fit_predict = Mock(return_value=labels) +# result = initializer._clusterize(X, self.mock_clusterizer) +# +# expected = np.array([[1.0, 0.0], [0.0, 1.0], [1.0, 0.0], [0.0, 1.0]]) +# np.testing.assert_array_equal(result, expected) +# +# def test_hard_clusterize_single_cluster_no_outliers(self): +# initializer = ClusterizeInitializer(is_accurate=True, is_soft=False, clusterizer=self.mock_clusterizer) +# +# X = np.array([1.0, 2.0, 3.0]) +# labels = np.array([0, 0, 0]) +# +# self.mock_clusterizer.fit_predict = Mock(return_value=labels) +# result = initializer._clusterize(X, self.mock_clusterizer) +# +# expected = np.array([[1.0], [1.0], [1.0]]) +# np.testing.assert_array_equal(result, expected) +# +# def test_hard_clusterize_with_outliers(self): +# initializer = ClusterizeInitializer(is_accurate=True, is_soft=False, clusterizer=self.mock_clusterizer) +# +# X = np.array([1.0, 2.0, 3.0]) +# labels = np.array([0, -1, 1]) +# +# self.mock_clusterizer.fit_predict = Mock(return_value=labels) +# result = initializer._clusterize(X, self.mock_clusterizer) +# +# expected = np.array([[1.0, 0.0], [0.5, 0.5], [0.0, 1.0]]) +# np.testing.assert_array_equal(result, expected) +# +# def test_hard_clusterize_all_outliers(self): +# initializer = ClusterizeInitializer(is_accurate=True, is_soft=False, clusterizer=self.mock_clusterizer) +# +# X = np.array([1.0, 2.0]) +# labels = np.array([-1, -1]) +# +# self.mock_clusterizer.fit_predict = Mock(return_value=labels) +# result = initializer._clusterize(X, self.mock_clusterizer) +# +# expected = np.array([[1.0], [1.0]]) +# np.testing.assert_array_equal(result, expected) +# +# def test_hard_clusterize_non_consecutive_labels(self): +# initializer = ClusterizeInitializer(is_accurate=True, is_soft=False, clusterizer=self.mock_clusterizer) +# +# X = np.array([1.0, 2.0, 3.0, 4.0]) +# labels = np.array([2, 5, 2, 5]) +# +# self.mock_clusterizer.fit_predict = Mock(return_value=labels) +# result = initializer._clusterize(X, self.mock_clusterizer) +# +# expected = np.array([[1.0, 0.0], [0.0, 1.0], [1.0, 0.0], [0.0, 1.0]]) +# np.testing.assert_array_equal(result, expected) +# +# @pytest.mark.parametrize("is_soft", [True, False]) +# def test_clusterize_failure(self, is_soft): +# initializer = ClusterizeInitializer(is_accurate=True, is_soft=is_soft, clusterizer=self.mock_clusterizer) +# +# X = np.array([1.0, 2.0, 3.0]) +# +# if is_soft: +# self.mock_clusterizer.fit_transform = Mock(side_effect=Exception("Failed")) +# expected_error = "Fuzzy clusterizer failed" +# else: +# self.mock_clusterizer.fit_predict = Mock(side_effect=Exception("Failed")) +# expected_error = "Hard clusterizer failed" +# +# with pytest.raises(ValueError, match=expected_error): +# initializer._clusterize(X, self.mock_clusterizer) +# +# def test_clusterizer_missing_methods(self): +# class InvalidClusterizer: +# pass +# +# invalid_clusterizer = InvalidClusterizer() +# X = np.array([1.0, 2.0, 3.0]) +# +# initializer_soft = ClusterizeInitializer(True, True, invalid_clusterizer) +# with pytest.raises(ValueError, match="Clusterizer doesn't have required method"): +# initializer_soft._clusterize(X, invalid_clusterizer) +# +# initializer_hard = ClusterizeInitializer(True, False, invalid_clusterizer) +# with pytest.raises(ValueError, match="Clusterizer doesn't have required method"): +# initializer_hard._clusterize(X, invalid_clusterizer) +# +# def test_perform_accurate_init_normal_path(self): +# initializer = ClusterizeInitializer(is_accurate=True, is_soft=True, clusterizer=self.mock_clusterizer) +# +# X = np.array([1.0, 2.0, 3.0]) +# H = np.array([[0.8, 0.2], [0.7, 0.3], [0.9, 0.1]]) +# dists = [self.mock_distributions[0], self.mock_distributions[1]] +# +# with ( +# patch.object(initializer, "_clusterize", return_value=H), +# patch( +# "rework_pysatl_mpest.initializers.cluster_match_strategy.match_clusters_for_models_akaike" +# ) as mock_match, +# ): +# mock_match.return_value = ( +# [dists[0], dists[1]], +# [{"mean": 1.0, "std": 0.5}, {"mean": 2.0, "std": 1.0}], +# [0.4, 0.6], +# ) +# +# result = initializer.perform( +# X=X, +# dists=dists, +# cluster_match_strategy=ClusterMatchStrategy.AKAIKE, +# estimation_strategies=[EstimationStrategy.QFUNCTION, EstimationStrategy.QFUNCTION], +# ) +# len_components = len(dists) +# +# assert isinstance(result, MixtureModel) +# assert len(result.components) == len_components +# assert len(result.weights) == len_components +# assert sum(result.weights) == pytest.approx(1.0) +# np.testing.assert_array_equal(result.weights, [0.8, 0.2]) +# +# def test_perform_accurate_init_fallback_to_fast_init(self): +# initializer = ClusterizeInitializer(is_accurate=True, is_soft=True, clusterizer=self.mock_clusterizer) +# +# X = np.array([[1.0], [2.0], [3.0]]) +# H = np.array([[0.8, 0.2], [0.7, 0.3], [0.9, 0.1]]) +# dists = [self.mock_distributions[0], self.mock_distributions[1]] +# +# with ( +# patch.object(initializer, "_clusterize", return_value=H), +# patch( +# "rework_pysatl_mpest.initializers.clusterize_initializer.match_clusters_for_models_log_likelihood" +# ) as mock_match, +# ): +# mock_match.return_value = ([dists[0], dists[1]], [None, {"mean": 2.0, "std": 1.0}], [0.5, 0.5]) +# +# with patch.object(initializer, "_fast_init") as mock_fast_init: +# mock_fast_init.return_value = ([dists[0], dists[1]], [0.3, 0.7]) +# optimizer = ScipyNelderMead() +# result = initializer.perform( +# X=X, +# dists=dists, +# cluster_match_strategy=ClusterMatchStrategy.LIKELIHOOD, +# estimation_strategies=[EstimationStrategy.QFUNCTION, EstimationStrategy.QFUNCTION], +# optimizer=optimizer, +# ) +# +# mock_fast_init.assert_called_once_with(X, H, optimizer) +# np.testing.assert_array_equal(result.weights, [0.3, 0.7]) +# +# def test_perform_fast_init(self): +# initializer = ClusterizeInitializer(is_accurate=False, is_soft=True, clusterizer=self.mock_clusterizer) +# +# X = np.array([1.0, 2.0, 3.0]) +# H = np.array([[0.8, 0.2], [0.7, 0.3], [0.9, 0.1]]) +# dists = [self.mock_distributions[0], self.mock_distributions[1]] +# +# with ( +# patch.object(initializer, "_clusterize", return_value=H), +# patch.object(initializer, "_estimation_strategies") as mock_est_strategies, +# ): +# mock_est_strategies.__getitem__.return_value = Mock(return_value={"mean": 0.0, "std": 1.0}) +# +# result = initializer.perform( +# X=X, +# dists=dists, +# cluster_match_strategy=ClusterMatchStrategy.LIKELIHOOD, +# estimation_strategies=[EstimationStrategy.QFUNCTION, EstimationStrategy.QFUNCTION], +# ) +# len_components = len(dists) +# assert isinstance(result, MixtureModel) +# assert len(result.components) == len_components +# assert len(result.weights) == len_components +# assert sum(result.weights) == pytest.approx(1.0) +# +# dists[0].set_params_from_vector.assert_called_once() +# dists[1].set_params_from_vector.assert_called_once() +# +# def test_weight_normalization(self): +# initializer = ClusterizeInitializer(is_accurate=True, is_soft=True, clusterizer=self.mock_clusterizer) +# +# X = np.array([1.0, 2.0, 3.0]) +# H = np.array([[0.8, 0.2], [0.7, 0.3], [0.9, 0.1]]) +# +# with ( +# patch.object(initializer, "_clusterize", return_value=H), +# patch.object(initializer, "_accurate_init") as mock_acc_init, +# ): +# mock_acc_init.return_value = ([self.mock_distributions[0], self.mock_distributions[1]], [2.0, 3.0]) +# +# result = initializer.perform( +# X=X, +# dists=[self.mock_distributions[0], self.mock_distributions[1]], +# cluster_match_strategy=ClusterMatchStrategy.AKAIKE, +# estimation_strategies=[EstimationStrategy.QFUNCTION, EstimationStrategy.QFUNCTION], +# ) +# +# expected_weights = [0.4, 0.6] +# for i, expected in enumerate(expected_weights): +# assert result.weights[i] == pytest.approx(expected, abs=0.01) +# assert sum(result.weights) == pytest.approx(1.0) +# +# def test_validation_errors_accurate_init(self): +# initializer = ClusterizeInitializer(is_accurate=True, is_soft=True, clusterizer=self.mock_clusterizer) +# +# X = np.array([1.0, 2.0, 3.0]) +# H = np.array([[0.8, 0.2], [0.7, 0.3], [0.9, 0.1]]) +# +# initializer.n_components = None +# initializer.models = [self.mock_distributions[0], self.mock_distributions[1]] +# initializer.estimation_strategies = [EstimationStrategy.QFUNCTION] * 2 +# +# with pytest.raises(ValueError, match="n_components must be set before calling _accurate_init"): +# initializer._accurate_init(X, H) +# +# initializer.n_components = 2 +# initializer.models = [self.mock_distributions[0], self.mock_distributions[1]] +# initializer.estimation_strategies = [EstimationStrategy.QFUNCTION] +# +# with pytest.raises(ValueError, match="Count of models must match count of estimation strategies"): +# initializer._accurate_init(X, H) +# +# def test_validation_errors_fast_init(self): +# initializer = ClusterizeInitializer(is_accurate=False, is_soft=True, clusterizer=self.mock_clusterizer) +# +# X = np.array([1.0, 2.0, 3.0]) +# H = np.array([[0.8, 0.2], [0.7, 0.3], [0.9, 0.1]]) +# +# initializer.n_components = None +# initializer.models = [self.mock_distributions[0], self.mock_distributions[1]] +# initializer.estimation_strategies = [EstimationStrategy.QFUNCTION] * 2 +# +# with pytest.raises(ValueError, match="n_components must be set before calling _fast_init"): +# initializer._fast_init(X, H) +# +# def test_different_distribution_types(self): +# initializer = ClusterizeInitializer(is_accurate=False, is_soft=True, clusterizer=self.mock_clusterizer) +# +# X = np.array([1.0, 2.0, 3.0, 4.0]) +# H = np.array([[0.8, 0.2, 0.1, 0.3], [0.7, 0.3, 0.2, 0.1], [0.9, 0.1, 0.3, 0.2], [0.6, 0.4, 0.5, 0.1]]) +# +# distributions = [] +# param_sets = [{"mean", "std"}, {"alpha", "beta"}, {"lambda"}, {"shape", "scale"}] +# +# for i, params in enumerate(param_sets): +# mock_dist = Mock(spec=ContinuousDistribution) +# mock_dist.params = params +# mock_dist.set_params_from_vector = Mock() +# distributions.append(mock_dist) +# +# self.mock_clusterizer.fit_transform = Mock(return_value=H) +# +# with patch.object(initializer, "_estimation_strategies") as mock_est_strategies: +# +# def mock_estimation_side_effect(model, X, H, optimizer): +# param_map = { +# frozenset({"mean", "std"}): {"mean": 2.0, "std": 1.0}, +# frozenset({"alpha", "beta"}): {"alpha": 2.0, "beta": 2.0}, +# frozenset({"lambda"}): {"lambda": 0.5}, +# frozenset({"shape", "scale"}): {"shape": 2.0, "scale": 1.0}, +# } +# return param_map.get(frozenset(model.params), {}) +# +# mock_est_strategies.__getitem__.return_value = Mock(side_effect=mock_estimation_side_effect) +# +# result = initializer.perform( +# X=X, +# dists=distributions, +# cluster_match_strategy=ClusterMatchStrategy.LIKELIHOOD, +# estimation_strategies=[EstimationStrategy.QFUNCTION] * len(distributions), +# ) +# +# assert isinstance(result, MixtureModel) +# assert len(result.components) == len(distributions) +# for dist in distributions: +# dist.set_params_from_vector.assert_called_once() +# +# def test_edge_case_empty_data(self): +# initializer = ClusterizeInitializer(is_accurate=True, is_soft=True, clusterizer=self.mock_clusterizer) +# +# X = np.array([]) +# H = np.array([]).reshape(0, 2) +# distributions = [Mock(spec=ContinuousDistribution) for _ in range(2)] +# +# for dist in distributions: +# dist.set_params_from_vector = Mock() +# +# self.mock_clusterizer.fit_transform = Mock(return_value=H) +# +# with patch.object(initializer, "_accurate_init") as mock_acc_init: +# mock_acc_init.return_value = (distributions, [0.5, 0.5]) +# +# result = initializer.perform( +# X=X, +# dists=distributions, +# cluster_match_strategy=ClusterMatchStrategy.LIKELIHOOD, +# estimation_strategies=[EstimationStrategy.QFUNCTION, EstimationStrategy.QFUNCTION], +# ) +# len_components = len(distributions) +# assert isinstance(result, MixtureModel) +# assert len(result.components) == len_components +# np.testing.assert_array_equal(result.weights, [0.5, 0.5]) +# +# def test_class_variables_immutable(self): +# assert isinstance(ClusterizeInitializer._estimation_strategies, MappingProxyType) +# assert isinstance(ClusterizeInitializer._cluster_match_strategies, MappingProxyType) +# assert EstimationStrategy.QFUNCTION in ClusterizeInitializer._estimation_strategies +# assert ClusterMatchStrategy.LIKELIHOOD in ClusterizeInitializer._cluster_match_strategies +# assert ClusterMatchStrategy.AKAIKE in ClusterizeInitializer._cluster_match_strategies