diff --git a/pyproject.toml b/pyproject.toml index 75f9a45..127cb15 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -23,6 +23,10 @@ scikit-learn = ">=1.6.1,<2.0.0" scipy = ">=1.15.2,<2.0.0" seaborn = ">=0.13.2,<0.14.0" scikit-fuzzy = "^0.5.0" +kneed = "^0.8.5" +pywavelets = "^1.9.0" +gdown = "^5.2.0" +xgboost = "^3.1.1" [tool.poetry.group.dev.dependencies] pytest = "^8.3.4" diff --git a/rework_pysatl_mpest/preprocessing/components_family/__init__.py b/rework_pysatl_mpest/preprocessing/components_family/__init__.py new file mode 100644 index 0000000..72c63e3 --- /dev/null +++ b/rework_pysatl_mpest/preprocessing/components_family/__init__.py @@ -0,0 +1,37 @@ +""" +components_family module for evaluating the family of components of a mixture + +This module provides a ready-made model for determining the most likely families of mixture components. +The choice of a mixture component family is important when working with mixtures of distributions + +**Purpose** + +components_family module helps to speed up the search for the most suitable mixture component configuration +by narrowing down the search to a few options + +**Usage Example** + +.. code-block:: python + >>> import numpy as np + >>> from rework_pysatl_mpest.preprocessing.components_family import ComponentsFamily + >>> from rework_pysatl_mpest.preprocessing.components_family import XGBBaseModel + + >>> # Create random sample + >>> X = np.linspace(-10, 10, 200) + + >>> # Determine 5 possible configurations using XGBaseModel + >>> model = ComponentsFamily(XGBBaseModel, top_k=5) + >>> configurations = model.predict(X) + + >>> print(f"Best 5 configurations: {configurations}") + >>> print(f"Best configuration: {configurations[0]}") +""" + +__author__ = "Mark Dubrovchenko" +__copyright__ = "Copyright (c) 2025 PySATL project" +__license__ = "SPDX-License-Identifier: MIT" + +from rework_pysatl_mpest.preprocessing.components_family.components_family import ComponentsFamily +from rework_pysatl_mpest.preprocessing.components_family.mixture_classifiers import XGBBaseModel + +__all__ = ["ComponentsFamily", "XGBBaseModel"] diff --git a/rework_pysatl_mpest/preprocessing/components_family/classifier_criterions.py b/rework_pysatl_mpest/preprocessing/components_family/classifier_criterions.py new file mode 100644 index 0000000..55b6e3f --- /dev/null +++ b/rework_pysatl_mpest/preprocessing/components_family/classifier_criterions.py @@ -0,0 +1,73 @@ +"""Module which contains collector of a vector of criterions for mixture classifier""" + +__author__ = "Mark Dubrovchenko" +__copyright__ = "Copyright (c) 2025 PySATL project" +__license__ = "SPDX-License-Identifier: MIT" + +import warnings +from math import ceil + +import numpy as np +from rework_pysatl_mpest.preprocessing.components_family.criterions import base_criterions +from rework_pysatl_mpest.preprocessing.components_family.criterions.abstract_criterion import ( + AHistClassifierCriterion, + APeaksClassifierCriterion, + ASampleClassifierCriterion, +) +from scipy.stats import iqr + + +class MixtureClassifierCriterions: + """ + MixtureClassifierCriterions + + Parameters + ---------- + :criterions list[ASampleClassifierCriterion | APeaksClassifierCriterion | AHistClassifierCriterion] + + — List of criterions for the mixture classifiers + """ + + def __init__( + self, + criterions: list[ + ASampleClassifierCriterion | APeaksClassifierCriterion | AHistClassifierCriterion + ] = base_criterions, + ) -> None: + self.criterions = criterions + + @staticmethod + def _get_hist(X: np.ndarray) -> np.ndarray: + """A function for constructing a histogram with constraints""" + n = X.size + bmin = 20 + bmax = 150 + + h = 1 * iqr(X) * n ** (-1 / 3) + bins = ceil((X.max() - X.min()) / h) if h > 0 else bmin + nbins = max(bmin, min(bins, bmax)) + + hist = np.histogram(X, bins=nbins, density=True)[0] + + return hist + + @staticmethod + def _get_criterion( + X: np.ndarray, + hist: np.ndarray, + criterion: (ASampleClassifierCriterion | APeaksClassifierCriterion | AHistClassifierCriterion), + ) -> float: + """Function for obtaining a single criterion based on a sample""" + + warnings.filterwarnings("ignore") + + if isinstance(criterion, ASampleClassifierCriterion): + return criterion.score(X) + + return criterion.score(hist) + + def get_criterions(self, X: np.ndarray) -> dict[str, float]: + """Function for evaluating a feature vector based on a sample""" + + hist_list = self._get_hist(X) + return dict([(criterion.name, self._get_criterion(X, hist_list, criterion)) for criterion in self.criterions]) diff --git a/rework_pysatl_mpest/preprocessing/components_family/classifier_models/__init__.py b/rework_pysatl_mpest/preprocessing/components_family/classifier_models/__init__.py new file mode 100644 index 0000000..9fd68af --- /dev/null +++ b/rework_pysatl_mpest/preprocessing/components_family/classifier_models/__init__.py @@ -0,0 +1,14 @@ +"""Module which contains interface of the classifier model and supported classifier models""" + +__author__ = "Mark Dubrovchenko" +__copyright__ = "Copyright (c) 2025 PySATL project" +__license__ = "SPDX-License-Identifier: MIT" + +from rework_pysatl_mpest.preprocessing.components_family.classifier_models.classifier_interface import ( + IClassifier, +) +from rework_pysatl_mpest.preprocessing.components_family.classifier_models.classifier_models import ( + XGBClassifier, +) + +__all__ = ["IClassifier", "XGBClassifier"] diff --git a/rework_pysatl_mpest/preprocessing/components_family/classifier_models/classifier_interface.py b/rework_pysatl_mpest/preprocessing/components_family/classifier_models/classifier_interface.py new file mode 100644 index 0000000..95b6204 --- /dev/null +++ b/rework_pysatl_mpest/preprocessing/components_family/classifier_models/classifier_interface.py @@ -0,0 +1,26 @@ +"""Module which contains interface of the classifier model""" + +__author__ = "Mark Dubrovchenko" +__copyright__ = "Copyright (c) 2025 PySATL project" +__license__ = "SPDX-License-Identifier: MIT" + +from abc import ABC, abstractmethod + +import numpy as np + + +class IClassifier(ABC): + """Class representing an interface for classification models""" + + @property + @abstractmethod + def is_fitted(self) -> bool: + """A property indicating whether the model has been trained""" + + @abstractmethod + def predict(self, criterions: dict[str, float]) -> np.ndarray: + """Abstract method for implementing a model prediction""" + + @abstractmethod + def load_model(self, model_path: str) -> None: + """An abstract method for implementing model loading""" diff --git a/rework_pysatl_mpest/preprocessing/components_family/classifier_models/classifier_models.py b/rework_pysatl_mpest/preprocessing/components_family/classifier_models/classifier_models.py new file mode 100644 index 0000000..24bd055 --- /dev/null +++ b/rework_pysatl_mpest/preprocessing/components_family/classifier_models/classifier_models.py @@ -0,0 +1,34 @@ +"""Module which contains all supported classifier models""" + +__author__ = "Mark Dubrovchenko" +__copyright__ = "Copyright (c) 2025 PySATL project" +__license__ = "SPDX-License-Identifier: MIT" + +import numpy as np +import xgboost as xgb +from rework_pysatl_mpest.preprocessing.components_family.classifier_models.classifier_interface import ( + IClassifier, +) + + +class XGBClassifier(IClassifier): + """Implementation of XGBoosting-based classifier""" + + def __init__(self) -> None: + self.model = xgb.Booster() + self._is_fitted: bool = False + + @property + def is_fitted(self) -> bool: + return self._is_fitted + + def load_model(self, model_path: str) -> None: + self.model.load_model(model_path) + self._is_fitted = True + + def predict(self, criterions: dict[str, float]) -> np.ndarray: + feature_names = list(criterions.keys()) + values = [criterions[name] for name in feature_names] + features = xgb.DMatrix([values], feature_names=feature_names) + + return self.model.predict(features) diff --git a/rework_pysatl_mpest/preprocessing/components_family/components_family.py b/rework_pysatl_mpest/preprocessing/components_family/components_family.py new file mode 100644 index 0000000..8e830da --- /dev/null +++ b/rework_pysatl_mpest/preprocessing/components_family/components_family.py @@ -0,0 +1,91 @@ +"""Module which contains method for initial estimation of mixture components family based on mixture classifier""" + +__author__ = "Mark Dubrovchenko" +__copyright__ = "Copyright (c) 2025 PySATL project" +__license__ = "SPDX-License-Identifier: MIT" + +import numpy as np +from rework_pysatl_mpest.distributions import ContinuousDistribution +from rework_pysatl_mpest.preprocessing.components_family.mixture_classifiers.mixture_classifier import ( + MixtureClassifierModel, +) +from rework_pysatl_mpest.preprocessing.components_number.abstract_estimator import AComponentsNumber + + +class ComponentsFamily: + """ + ComponentsFamily + + Parameters + ---------- + :model: MixtureClassifierModel — Mixture Classifier Model + :top_k: int — Top k most likely mixtures + :components_number: int | None — Method for estimating number of components + :random_state: int | None — Determines random generation for some criterions + """ + + def __init__( + self, + recognition_model: MixtureClassifierModel, + top_k: int, + components_number: AComponentsNumber | None = None, + state: int | None = None, + ) -> None: + self.model = recognition_model + self.top_k = top_k + self.components_number = components_number + self.state = state + + def predict(self, X: np.ndarray, k: int | list[int] | None = None) -> list[list[ContinuousDistribution]]: + """ + Function for evaluating the top k most probable configurations + + Parameters + ---------- + :X: np.ndarray — Sample Data + :k: int | list[int] | None — The set number of components of the mixture + + k is a specific number, or a number in a specified range, or None + (to determine the number of components using a specified method, + or if no method is specified, to use the entire range from 1 to 10 components) + + Returns + ---------- + list[list[Distribution]] + + — List of mixture configurations using distribution classes for further work with the mixture + """ + + def __get_components_n(k: None | int | list[int]) -> list[int]: + """Function that defines the boundaries of the possible number of mixture components""" + upper_bound = 10 + + if isinstance(k, int): + return [k] + + if isinstance(k, list): + return k + + if isinstance(self.components_number, AComponentsNumber): + comp_k = self.components_number.estimate(X) + return [max(comp_k - 1, 1), comp_k, min(comp_k + 1, upper_bound)] + + return [i for i in range(1, upper_bound + 1)] + + np.random.seed(self.state) + + n = __get_components_n(k) + prob = self.model.predict(X) + result: list[list[ContinuousDistribution]] = [] + + for i in np.argsort(prob)[::-1]: + if len(result) == self.top_k: + break + + components = self.model.transform(i) + if len(components) not in n: + continue + + result.append(components) + + return result diff --git a/rework_pysatl_mpest/preprocessing/components_family/criterions/__init__.py b/rework_pysatl_mpest/preprocessing/components_family/criterions/__init__.py new file mode 100644 index 0000000..ad94868 --- /dev/null +++ b/rework_pysatl_mpest/preprocessing/components_family/criterions/__init__.py @@ -0,0 +1,155 @@ +"""Module which contains all available mixture classifier criterions for mixture classifiers""" + +__author__ = "Mark Dubrovchenko" +__copyright__ = "Copyright (c) 2025 PySATL project" +__license__ = "SPDX-License-Identifier: MIT" + +from rework_pysatl_mpest.preprocessing.components_family.criterions.abstract_criterion import ( + AHistClassifierCriterion, + APeaksClassifierCriterion, + ASampleClassifierCriterion, +) +from rework_pysatl_mpest.preprocessing.components_family.criterions.frequency_criterions import ( + CDct, + CDctEnergy, + CSpecBandwidth, + CSpecCentroid, + CSpecDecrease, + CSpecEnergy, + CSpecEntropy, + CSpecFlatness, + CSpecRolloff, + CSpecSlope, + CWaveletEnergy, + CWaveletEntropy, + CWaveletLarge, + CWaveletMean, + CWaveletStd, +) +from rework_pysatl_mpest.preprocessing.components_family.criterions.hist_criterions import ( + CHistEnergy, + CHistEntropy, + CHistFlat, + CHistLength, + CHistUniform, + CSobelCount, + CSobelMax, + CSobelMean, + CSobelMin, +) +from rework_pysatl_mpest.preprocessing.components_family.criterions.peaks_criterions import ( + CPeaksCount, + CPeaksDistMax, + CPeaksDistMean, + CPeaksDistMin, + CPeaksFirst, + CPeaksLast, + CPeaksMax, + CPeaksMean, + CPeaksMin, + CPeaksWidthMax, + CPeaksWidthMean, + CPeaksWidthMin, + CValleysDistMax, + CValleysDistMean, + CValleysDistMin, + CValleysMax, + CValleysMean, + CValleysMin, + CValleysWidthMax, + CValleysWidthMean, + CValleysWidthMin, +) +from rework_pysatl_mpest.preprocessing.components_family.criterions.sample_criterions import ( + CBootKurt, + CHillAbs, + CIqr, + CKurt, + CKurtMoors, + CLogRatio, + CMaxZscore, + CNegativeValue, + COutlierFraction, + CRange, + CSkew, + CSkewBowley, + CSpacingGap, + CSpacingGini, +) + +base_criterions: list[ASampleClassifierCriterion | APeaksClassifierCriterion | AHistClassifierCriterion] = [ + CIqr(), + CKurt(), + CSkew(), + CNegativeValue(), + CRange(), + CMaxZscore(), + CPeaksCount(), + CPeaksMean(), + CValleysMean(), + CPeaksMax(), + CPeaksMin(), + CPeaksWidthMax(), + CPeaksWidthMin(), + CPeaksWidthMean(), + CPeaksDistMax(), + CPeaksDistMin(), + CPeaksDistMean(), + CValleysMax(), + CValleysMin(), + CValleysWidthMax(), + CValleysWidthMin(), + CValleysWidthMean(), + CValleysDistMax(), + CValleysDistMin(), + CValleysDistMean(), + CHistEnergy(), + CHistEntropy(), + CHistLength(), + CHistUniform(), + CSpecEnergy(), + CSpecCentroid(), + CSpecSlope(), + CSpecRolloff(), + CSpecBandwidth(), + CSpecFlatness(), + CSpecEntropy(), + CSpecDecrease(), + CDct(1), + CDct(2), + CDct(3), + CDctEnergy(), + CPeaksFirst(), + CPeaksLast(), + CBootKurt(), + CHillAbs(), + CSkewBowley(), + CKurtMoors(), + CLogRatio(), + CHistFlat(), + COutlierFraction(), + CSpacingGap(), + CSpacingGini(), + CSobelCount(), + CSobelMean(), + CSobelMax(), + CSobelMin(), + CWaveletEnergy(1), + CWaveletEnergy(2), + CWaveletEnergy(3), + CWaveletEntropy(1), + CWaveletEntropy(2), + CWaveletEntropy(3), + CWaveletMean(1), + CWaveletMean(2), + CWaveletMean(3), + CWaveletStd(1), + CWaveletStd(2), + CWaveletStd(3), + CWaveletLarge(1), + CWaveletLarge(2), + CWaveletLarge(3), + CWaveletLarge(1, 0.01), + CWaveletLarge(2, 0.01), + CWaveletLarge(3, 0.01), +] diff --git a/rework_pysatl_mpest/preprocessing/components_family/criterions/abstract_criterion.py b/rework_pysatl_mpest/preprocessing/components_family/criterions/abstract_criterion.py new file mode 100644 index 0000000..84b3956 --- /dev/null +++ b/rework_pysatl_mpest/preprocessing/components_family/criterions/abstract_criterion.py @@ -0,0 +1,59 @@ +"""Module which contains abstract classes of mixture classifier criterions""" + +__author__ = "Mark Dubrovchenko" +__copyright__ = "Copyright (c) 2025 PySATL project" +__license__ = "SPDX-License-Identifier: MIT" + +from abc import ABC, abstractmethod + +import numpy as np +from scipy.signal import find_peaks + + +class ASampleClassifierCriterion(ABC): + """Abstract class of sample feature for mixture classifier""" + + @property + @abstractmethod + def name(self) -> str: + """Name getter""" + + @abstractmethod + def score(self, X: np.ndarray) -> float: + """Function evaluating sample feature for a mixture classifier""" + + +class APeaksClassifierCriterion(ABC): + """Abstract class of peaks feature for mixture classifier""" + + @property + @abstractmethod + def name(self) -> str: + """Name getter""" + + @staticmethod + def _get_peaks(hist: np.ndarray, is_valleys: bool = False) -> list[np.ndarray]: + hist_prep = np.concatenate((np.zeros(1), hist, np.zeros(1))) + if not is_valleys: + peaks, _ = find_peaks(hist_prep) + return [hist_prep, peaks] + + valleys, _ = find_peaks(-hist_prep) + return [hist_prep, valleys] + + @abstractmethod + def score(self, hist: np.ndarray) -> float: + """Function evaluating peaks feature for a mixture classifier""" + + +class AHistClassifierCriterion(ABC): + """Abstract class of hist feature for mixture classifier""" + + @property + @abstractmethod + def name(self) -> str: + """Name getter""" + + @abstractmethod + def score(self, hist: np.ndarray) -> float: + """Function evaluating hist feature for a mixture classifier""" diff --git a/rework_pysatl_mpest/preprocessing/components_family/criterions/frequency_criterions.py b/rework_pysatl_mpest/preprocessing/components_family/criterions/frequency_criterions.py new file mode 100644 index 0000000..39b03f6 --- /dev/null +++ b/rework_pysatl_mpest/preprocessing/components_family/criterions/frequency_criterions.py @@ -0,0 +1,279 @@ +"""Module which contains frequency features (from sound recognition) for a mixture classifier""" + +__author__ = "Mark Dubrovchenko" +__copyright__ = "Copyright (c) 2025 PySATL project" +__license__ = "SPDX-License-Identifier: MIT" + +import numpy as np +from pywt import wavedec +from rework_pysatl_mpest.preprocessing.components_family.criterions.abstract_criterion import ( + AHistClassifierCriterion, +) +from scipy.fft import rfft +from scipy.fftpack import dct +from scipy.signal import periodogram + + +class CDct(AHistClassifierCriterion): + def __init__(self, dct_type: int) -> None: + self.dct_type = dct_type + + @property + def name(self) -> str: + return f"DCT C{self.dct_type} Criterion" + + def score(self, hist: np.ndarray) -> float: + hist = hist / np.sum(hist) + + d = dct(hist, norm="ortho") + return d[self.dct_type] if len(d) > self.dct_type else 0 + + +class CDctEnergy(AHistClassifierCriterion): + @property + def name(self) -> str: + return "DCT Energy Criterion" + + def score(self, hist: np.ndarray) -> float: + k = 4 + hist = hist / np.sum(hist) + + d = dct(hist, norm="ortho") + return np.sum(d[k:] ** 2) if len(d) > k else 0 + + +class CSpecBandwidth(AHistClassifierCriterion): + def __init__(self, noise: float = 10**-12) -> None: + self.noise = noise + + @property + def name(self) -> str: + return "Spectral Bandwidth Criterion" + + def score(self, hist: np.ndarray) -> float: + hist = hist / np.sum(hist) + + spec = np.abs(rfft(hist)) + freq = np.arange(len(spec)) + centroid = np.sum(spec * freq) / (np.sum(spec) + self.noise) + return np.sqrt(np.sum((freq - centroid) ** 2 * spec) / (np.sum(spec) + self.noise)) + + +class CSpecCentroid(AHistClassifierCriterion): + def __init__(self, noise: float = 10**-12) -> None: + self.noise = noise + + @property + def name(self) -> str: + return "Spectral Centroid Criterion" + + def score(self, hist: np.ndarray) -> float: + hist = hist / np.sum(hist) + + spec = np.abs(rfft(hist)) + freq = np.arange(len(spec)) + return np.sum(spec * freq) / (np.sum(spec) + self.noise) + + +class CSpecDecrease(AHistClassifierCriterion): + def __init__(self, noise: float = 10**-12) -> None: + self.noise = noise + + @property + def name(self) -> str: + return "Spectral Decrease Criterion" + + def score(self, hist: np.ndarray) -> float: + hist = hist / np.sum(hist) + + spec = np.abs(rfft(hist)) + if len(spec) <= 1: + return 0 + + m1 = spec[1:] + return np.sum((m1[1:] - m1[:-1]) / np.arange(1, len(m1))) / (np.sum(m1) + self.noise) + + +class CSpecEnergy(AHistClassifierCriterion): + @property + def name(self) -> str: + return "Spectral Energy Criterion" + + def score(self, hist: np.ndarray) -> float: + hist = hist / np.sum(hist) + + spec = rfft(hist) + return np.sum(np.abs(spec**2)) + + +class CSpecEntropy(AHistClassifierCriterion): + def __init__(self, noise: float = 10**-12) -> None: + self.noise = noise + + @property + def name(self) -> str: + return "Spectral Entropy Criterion" + + def score(self, hist: np.ndarray) -> float: + hist = hist / np.sum(hist) + + freq, psd = periodogram(hist) + psd = psd / (np.sum(psd) + self.noise) + return -np.sum(psd * np.log2(psd + self.noise)) + + +class CSpecFlatness(AHistClassifierCriterion): + def __init__(self, noise: float = 10**-12) -> None: + self.noise = noise + + @property + def name(self) -> str: + return "Spectral Flatness Criterion" + + def score(self, hist: np.ndarray) -> float: + hist = hist / np.sum(hist) + + spec = np.abs(rfft(hist)) + self.noise + gmean = np.exp(np.mean(np.log(spec))) + return gmean / np.mean(spec) + + +class CSpecRolloff(AHistClassifierCriterion): + def __init__(self, roll: float = 0.85) -> None: + self.roll = roll + + @property + def name(self) -> str: + return "Spectral Rolloff Criterion" + + def score(self, hist: np.ndarray) -> float: + hist = hist / np.sum(hist) + + spec = np.abs(rfft(hist)) + cumsum = np.cumsum(spec) + return np.where(cumsum >= self.roll * cumsum[-1])[0][0] + + +class CSpecSlope(AHistClassifierCriterion): + def __init__(self, noise: float = 10**-12) -> None: + self.noise = noise + + @property + def name(self) -> str: + return "Spectral Slope Criterion" + + def score(self, hist: np.ndarray) -> float: + hist = hist / np.sum(hist) + + spec = np.abs(rfft(hist)) + freq = np.arange(len(spec)) + fm, sm = freq.mean(), spec.mean() + return np.sum((freq - fm) * (spec - sm)) / (np.sum((freq - fm) ** 2) + self.noise) + + +class CWaveletEnergy(AHistClassifierCriterion): + def __init__(self, level: int = 1, level_max: int = 3, wavelet: str = "haar") -> None: + self.level = level + self.level_max = level_max + self.wavelet = wavelet + + @property + def name(self) -> str: + return f"Wavelet {self.level} Energy Criterion" + + def score(self, hist: np.ndarray) -> float: + hist = hist / np.sum(hist) + + coeffs = wavedec(hist, self.wavelet, level=self.level_max) + total = sum(np.sum(c**2) for c in coeffs) + + return np.sum(coeffs[self.level - 1] ** 2) / total + + +class CWaveletEntropy(AHistClassifierCriterion): + def __init__( + self, + level: int = 1, + level_max: int = 3, + wavelet: str = "haar", + noise: float = 10**-12, + ) -> None: + self.level = level + self.level_max = level_max + self.wavelet = wavelet + self.noise = noise + + @property + def name(self) -> str: + return f"Wavelet {self.level} Entropy Criterion" + + def score(self, hist: np.ndarray) -> float: + hist = hist / np.sum(hist) + + coeffs = wavedec(hist, self.wavelet, level=self.level_max) + c_abs = np.abs(coeffs[self.level - 1]) + c_norm = c_abs / (c_abs.sum() + self.noise) + + return -np.sum(c_norm * np.log(c_norm + self.noise)) + + +class CWaveletLarge(AHistClassifierCriterion): + def __init__( + self, + level: int = 1, + threshold: float = 0.1, + level_max: int = 3, + wavelet: str = "haar", + ) -> None: + self.level = level + self.level_max = level_max + self.threshold = threshold + self.wavelet = wavelet + + @property + def name(self) -> str: + return f"Wavelet {self.level} Large {self.threshold} Criterion" + + def score(self, hist: np.ndarray) -> float: + hist = hist / np.sum(hist) + + coeffs = wavedec(hist, self.wavelet, level=self.level_max) + c_abs = np.abs(coeffs[self.level - 1]) + + return np.mean(c_abs > np.max(c_abs) * self.threshold) + + +class CWaveletMean(AHistClassifierCriterion): + def __init__(self, level: int = 1, level_max: int = 3, wavelet: str = "haar") -> None: + self.level = level + self.level_max = level_max + self.wavelet = wavelet + + @property + def name(self) -> str: + return f"Wavelet {self.level} Mean Criterion" + + def score(self, hist: np.ndarray) -> float: + hist = hist / np.sum(hist) + + coeffs = wavedec(hist, self.wavelet, level=self.level_max) + + return np.mean(coeffs[self.level - 1]) + + +class CWaveletStd(AHistClassifierCriterion): + def __init__(self, level: int = 1, level_max: int = 3, wavelet: str = "haar") -> None: + self.level = level + self.level_max = level_max + self.wavelet = wavelet + + @property + def name(self) -> str: + return f"Wavelet {self.level} Std Criterion" + + def score(self, hist: np.ndarray) -> float: + hist = hist / np.sum(hist) + + coeffs = wavedec(hist, self.wavelet, level=self.level_max) + + return np.std(coeffs[self.level - 1]) diff --git a/rework_pysatl_mpest/preprocessing/components_family/criterions/hist_criterions.py b/rework_pysatl_mpest/preprocessing/components_family/criterions/hist_criterions.py new file mode 100644 index 0000000..023eb3c --- /dev/null +++ b/rework_pysatl_mpest/preprocessing/components_family/criterions/hist_criterions.py @@ -0,0 +1,131 @@ +"""Module which contains histogram features for a mixture classifier""" + +__author__ = "Mark Dubrovchenko" +__copyright__ = "Copyright (c) 2025 PySATL project" +__license__ = "SPDX-License-Identifier: MIT" + +import numpy as np +from rework_pysatl_mpest.preprocessing.components_family.criterions.abstract_criterion import ( + AHistClassifierCriterion, +) +from scipy.ndimage import sobel +from scipy.spatial.distance import jensenshannon + + +class CHistEnergy(AHistClassifierCriterion): + @property + def name(self) -> str: + return "Hist Energy Criterion" + + def score(self, hist: np.ndarray) -> float: + hist = hist / np.sum(hist) + return np.sum(hist**2) + + +class CHistEntropy(AHistClassifierCriterion): + def __init__(self, noise: float = 10**-12) -> None: + self.noise = noise + + @property + def name(self) -> str: + return "Hist Entropy Criterion" + + def score(self, hist: np.ndarray) -> float: + hist = hist / np.sum(hist) + return -np.sum(hist * np.log2(hist + self.noise)) + + +class CHistFlat(AHistClassifierCriterion): + def __init__(self, rate: float = 0.05) -> None: + self.rate = rate + + @property + def name(self) -> str: + return "Hist Flatness Criterion" + + def score(self, hist: np.ndarray) -> float: + hist = hist / np.sum(hist) + return np.mean(np.abs(np.diff(hist)) < self.rate) + + +class CHistLength(AHistClassifierCriterion): + @property + def name(self) -> str: + return "Hist Length Criterion" + + def score(self, hist: np.ndarray) -> float: + hist = hist / np.sum(hist) + return np.sum(np.abs(np.diff(hist))) + + +class CHistUniform(AHistClassifierCriterion): + @property + def name(self) -> str: + return "Hist Uniform Criterion" + + def score(self, hist: np.ndarray) -> float: + n = len(hist) + hist = hist / np.sum(hist) + + uniform = np.ones(n) / n + return jensenshannon(hist, uniform) + + +class CSobelCount(AHistClassifierCriterion): + def __init__(self, threshold: float = 0.01) -> None: + self.threshold = threshold + + @property + def name(self) -> str: + return "Sobel Count Criterion" + + def score(self, hist: np.ndarray) -> float: + hist = hist / np.sum(hist) + + sob = sobel(hist) + return np.mean(np.abs(sob) > np.max(np.abs(sob)) * self.threshold) + + +class CSobelMax(AHistClassifierCriterion): + def __init__(self, threshold: float = 0.01) -> None: + self.threshold = threshold + + @property + def name(self) -> str: + return "Sobel Max Criterion" + + def score(self, hist: np.ndarray) -> float: + hist = hist / np.sum(hist) + + sob = sobel(hist) + return np.max(np.abs(sob)) + + +class CSobelMean(AHistClassifierCriterion): + def __init__(self, threshold: float = 0.01) -> None: + self.threshold = threshold + + @property + def name(self) -> str: + return "Sobel Mean Criterion" + + def score(self, hist: np.ndarray) -> float: + hist = hist / np.sum(hist) + + sob = sobel(hist) + return np.mean(np.abs(sob)) + + +class CSobelMin(AHistClassifierCriterion): + def __init__(self, threshold: float = 0.01) -> None: + self.threshold = threshold + + @property + def name(self) -> str: + return "Sobel Min Criterion" + + def score(self, hist: np.ndarray) -> float: + hist = hist / np.sum(hist) + + sob = sobel(hist) + return np.min(np.abs(sob)) diff --git a/rework_pysatl_mpest/preprocessing/components_family/criterions/peaks_criterions.py b/rework_pysatl_mpest/preprocessing/components_family/criterions/peaks_criterions.py new file mode 100644 index 0000000..9c4ae96 --- /dev/null +++ b/rework_pysatl_mpest/preprocessing/components_family/criterions/peaks_criterions.py @@ -0,0 +1,227 @@ +"""Module which contains peaks features for a mixture classifier""" + +__author__ = "Mark Dubrovchenko" +__copyright__ = "Copyright (c) 2025 PySATL project" +__license__ = "SPDX-License-Identifier: MIT" + +import numpy as np +from rework_pysatl_mpest.preprocessing.components_family.criterions.abstract_criterion import ( + APeaksClassifierCriterion, +) +from scipy.signal import peak_widths + + +class CPeaksCount(APeaksClassifierCriterion): + @property + def name(self) -> str: + return "Peaks Count Criterion" + + def score(self, hist: np.ndarray) -> float: + peaks = self._get_peaks(hist)[1] + return len(peaks) + + +class CPeaksDistMax(APeaksClassifierCriterion): + @property + def name(self) -> str: + return "Peaks Distance Max Criterion" + + def score(self, hist: np.ndarray) -> float: + peaks = self._get_peaks(hist)[1] + return np.max(np.abs(np.diff(peaks)) - 1) / len(hist) if len(peaks) > 1 else 0 + + +class CPeaksDistMean(APeaksClassifierCriterion): + @property + def name(self) -> str: + return "Peaks Distance Mean Criterion" + + def score(self, hist: np.ndarray) -> float: + peaks = self._get_peaks(hist)[1] + return np.mean(np.abs(np.diff(peaks)) - 1) / len(hist) if len(peaks) > 1 else 0 + + +class CPeaksDistMin(APeaksClassifierCriterion): + @property + def name(self) -> str: + return "Peaks Distance Min Criterion" + + def score(self, hist: np.ndarray) -> float: + peaks = self._get_peaks(hist)[1] + return np.min(np.abs(np.diff(peaks)) - 1) / len(hist) if len(peaks) > 1 else 0 + + +class CPeaksFirst(APeaksClassifierCriterion): + @property + def name(self) -> str: + return "Peaks First Criterion" + + def score(self, hist: np.ndarray) -> float: + peaks = self._get_peaks(hist)[1] + return float(1 in peaks) + + +class CPeaksLast(APeaksClassifierCriterion): + @property + def name(self) -> str: + return "Peaks Last Criterion" + + def score(self, hist: np.ndarray) -> float: + peaks = self._get_peaks(hist)[1] + return float(len(hist) in peaks) + + +class CPeaksMax(APeaksClassifierCriterion): + @property + def name(self) -> str: + return "Peaks Max Criterion" + + def score(self, hist: np.ndarray) -> float: + hist, peaks = self._get_peaks(hist) + hist /= np.sum(hist) + return hist[peaks].max() + + +class CPeaksMean(APeaksClassifierCriterion): + @property + def name(self) -> str: + return "Peaks Mean Criterion" + + def score(self, hist: np.ndarray) -> float: + hist, peaks = self._get_peaks(hist) + hist /= np.sum(hist) + return hist[peaks].mean() + + +class CPeaksMin(APeaksClassifierCriterion): + @property + def name(self) -> str: + return "Peaks Min Criterion" + + def score(self, hist: np.ndarray) -> float: + hist, peaks = self._get_peaks(hist) + hist /= np.sum(hist) + return hist[peaks].min() + + +class CPeaksWidthMax(APeaksClassifierCriterion): + @property + def name(self) -> str: + return "Peaks Width Max Criterion" + + def score(self, hist: np.ndarray) -> float: + hist, peaks = self._get_peaks(hist) + return np.max(peak_widths(hist, peaks)[0]) / (len(hist) - 2) + + +class CPeaksWidthMean(APeaksClassifierCriterion): + @property + def name(self) -> str: + return "Peaks Width Mean Criterion" + + def score(self, hist: np.ndarray) -> float: + hist, peaks = self._get_peaks(hist) + return np.mean(peak_widths(hist, peaks)[0]) / (len(hist) - 2) + + +class CPeaksWidthMin(APeaksClassifierCriterion): + @property + def name(self) -> str: + return "Peaks Width Min Criterion" + + def score(self, hist: np.ndarray) -> float: + hist, peaks = self._get_peaks(hist) + return np.min(peak_widths(hist, peaks)[0]) / (len(hist) - 2) + + +class CValleysDistMax(APeaksClassifierCriterion): + @property + def name(self) -> str: + return "Valleys Distance Max Criterion" + + def score(self, hist: np.ndarray) -> float: + valleys = self._get_peaks(hist, True)[1] + return np.max(np.abs(np.diff(valleys)) - 1) / len(hist) if len(valleys) > 1 else 0 + + +class CValleysDistMean(APeaksClassifierCriterion): + @property + def name(self) -> str: + return "Valleys Distance Mean Criterion" + + def score(self, hist: np.ndarray) -> float: + valleys = self._get_peaks(hist, True)[1] + return np.mean(np.abs(np.diff(valleys)) - 1) / len(hist) if len(valleys) > 1 else 0 + + +class CValleysDistMin(APeaksClassifierCriterion): + @property + def name(self) -> str: + return "Valleys Distance Min Criterion" + + def score(self, hist: np.ndarray) -> float: + valleys = self._get_peaks(hist, True)[1] + return np.min(np.abs(np.diff(valleys)) - 1) / len(hist) if len(valleys) > 1 else 0 + + +class CValleysMax(APeaksClassifierCriterion): + @property + def name(self) -> str: + return "Valleys Max Criterion" + + def score(self, hist: np.ndarray) -> float: + hist, valleys = self._get_peaks(hist, True) + hist /= np.sum(hist) + return hist[valleys].max() if len(valleys) != 0 else 0 + + +class CValleysMean(APeaksClassifierCriterion): + @property + def name(self) -> str: + return "Valleys Mean Criterion" + + def score(self, hist: np.ndarray) -> float: + hist, valleys = self._get_peaks(hist, True) + hist /= np.sum(hist) + return hist[valleys].mean() if len(valleys) != 0 else 0 + + +class CValleysMin(APeaksClassifierCriterion): + @property + def name(self) -> str: + return "Valleys Min Criterion" + + def score(self, hist: np.ndarray) -> float: + hist, valleys = self._get_peaks(hist, True) + hist /= np.sum(hist) + return hist[valleys].min() if len(valleys) != 0 else 0 + + +class CValleysWidthMax(APeaksClassifierCriterion): + @property + def name(self) -> str: + return "Valleys Width Max Criterion" + + def score(self, hist: np.ndarray) -> float: + hist, valleys = self._get_peaks(hist, True) + return np.max(peak_widths(-hist, valleys)[0]) / (len(hist) - 2) if len(valleys) != 0 else 0 + + +class CValleysWidthMean(APeaksClassifierCriterion): + @property + def name(self) -> str: + return "Valleys Width Mean Criterion" + + def score(self, hist: np.ndarray) -> float: + hist, valleys = self._get_peaks(hist, True) + return np.mean(peak_widths(-hist, valleys)[0]) / (len(hist) - 2) if len(valleys) != 0 else 0 + + +class CValleysWidthMin(APeaksClassifierCriterion): + @property + def name(self) -> str: + return "Valleys Width Min Criterion" + + def score(self, hist: np.ndarray) -> float: + hist, valleys = self._get_peaks(hist, True) + return np.min(peak_widths(-hist, valleys)[0]) / (len(hist) - 2) if len(valleys) != 0 else 0 diff --git a/rework_pysatl_mpest/preprocessing/components_family/criterions/sample_criterions.py b/rework_pysatl_mpest/preprocessing/components_family/criterions/sample_criterions.py new file mode 100644 index 0000000..a7749a2 --- /dev/null +++ b/rework_pysatl_mpest/preprocessing/components_family/criterions/sample_criterions.py @@ -0,0 +1,175 @@ +"""Module which contains sample features for a mixture classifier""" + +__author__ = "Mark Dubrovchenko" +__copyright__ = "Copyright (c) 2025 PySATL project" +__license__ = "SPDX-License-Identifier: MIT" + +import numpy as np +from rework_pysatl_mpest.preprocessing.components_family.criterions.abstract_criterion import ( + ASampleClassifierCriterion, +) +from scipy.stats import iqr, kurtosis, skew, zscore + + +class CKurt(ASampleClassifierCriterion): + @property + def name(self) -> str: + return "Kurtosis Criterion" + + def score(self, X: np.ndarray) -> float: + result = kurtosis(X) + return kurtosis(X) if not np.isnan(result) else 0 + + +class CNegativeValue(ASampleClassifierCriterion): + @property + def name(self) -> str: + return "Negative Value Criterion" + + def score(self, X: np.ndarray) -> float: + return float(np.min(X) < 0) + + +class CIqr(ASampleClassifierCriterion): + @property + def name(self) -> str: + return "IQR Criterion" + + def score(self, X: np.ndarray) -> float: + return iqr(X) / np.median(X) + + +class CKurtMoors(ASampleClassifierCriterion): + @property + def name(self) -> str: + return "Kurtosis Moors Criterion" + + def score(self, X: np.ndarray) -> float: + p1, p2, p3, p4, p5, p6 = np.percentile(X, [12.5, 25, 37.5, 62.5, 75, 87.5]) + result = ((p6 - p2) + (p5 - p3)) / (p4 - p2) + return result if not np.isnan(result) else 0 + + +class CLogRatio(ASampleClassifierCriterion): + @property + def name(self) -> str: + return "Log Extreme Ratio Criterion" + + def score(self, X: np.ndarray) -> float: + xmin, xmax = np.min(X), np.max(X) + median = np.median(X) + result = np.log((xmax - median) / (median - xmin)) + return result if not np.isnan(result) else 0 + + +class COutlierFraction(ASampleClassifierCriterion): + def __init__(self, k: float = 3) -> None: + self.k = k + + @property + def name(self) -> str: + return "Outlier Fraction Criterion" + + def score(self, X: np.ndarray) -> float: + mu, sigma = np.mean(X), np.std(X) + return np.mean(np.abs(X - mu) > self.k * sigma) + + +class CRange(ASampleClassifierCriterion): + @property + def name(self) -> str: + return "Range Criterion" + + def score(self, X: np.ndarray) -> float: + return np.ptp(X) + + +class CSkew(ASampleClassifierCriterion): + @property + def name(self) -> str: + return "Skewness Criterion" + + def score(self, X: np.ndarray) -> float: + result = skew(X) + return result if not np.isnan(result) else 0 + + +class CSkewBowley(ASampleClassifierCriterion): + @property + def name(self) -> str: + return "Skewness Bowley Criterion" + + def score(self, X: np.ndarray) -> float: + p25, p50, p75 = np.percentile(X, [25, 50, 75]) + result = (p75 + p25 - 2 * p50) / (p75 - p25) + return result if not np.isnan(result) else 0 + + +class CSpacingGap(ASampleClassifierCriterion): + def __init__(self, rate: float = 5) -> None: + self.rate = rate + + @property + def name(self) -> str: + return "Spacing Gap Criterion" + + def score(self, X: np.ndarray) -> float: + diff = np.diff(np.sort(X)) + dmedian = np.median(diff) + return np.mean(diff > self.rate * dmedian) + + +class CSpacingGini(ASampleClassifierCriterion): + @property + def name(self) -> str: + return "Spacing Gini Criterion" + + def score(self, X: np.ndarray) -> float: + diff = np.diff(np.sort(X)) + diff = np.sort(diff) + n = len(diff) + index = np.arange(1, n + 1) + result = np.sum((2 * index - n - 1) * diff) / (np.sum(diff) * n) + return result if not np.isnan(result) else 0 + + +class CMaxZscore(ASampleClassifierCriterion): + @property + def name(self) -> str: + return "Z-Score Criterion" + + def score(self, X: np.ndarray) -> float: + result = np.max(np.abs(zscore(X))) + return result if not np.isnan(result) else 0 + + +class CBootKurt(ASampleClassifierCriterion): + def __init__(self, n_boot: int = 200, state: int | None = None) -> None: + self.n_boot = n_boot + self.state = state + + @property + def name(self) -> str: + return "Bootstrap Kurtosis Criterion" + + def score(self, X: np.ndarray) -> float: + np.random.seed(self.state) + + n = len(X) + means = [kurtosis(np.random.choice(X, size=n, replace=True)) for _ in range(self.n_boot)] + result = np.var(means) + return result if not np.isnan(result) else 0 + + +class CHillAbs(ASampleClassifierCriterion): + @property + def name(self) -> str: + return "Hill Abs Criterion" + + def score(self, X: np.ndarray) -> float: + X = np.sort(np.abs(X)) + k = int(len(X) ** 0.5) + + x_tail = X[-k:] + x_min = X[-k - 1] + return (1 / k) * np.sum(np.log(x_tail) - np.log(x_min)) diff --git a/rework_pysatl_mpest/preprocessing/components_family/mixture_classifiers/__init__.py b/rework_pysatl_mpest/preprocessing/components_family/mixture_classifiers/__init__.py new file mode 100644 index 0000000..f2093d4 --- /dev/null +++ b/rework_pysatl_mpest/preprocessing/components_family/mixture_classifiers/__init__.py @@ -0,0 +1,12 @@ +"""Module which contains all available mixture classifiers""" + +__author__ = "Mark Dubrovchenko" +__copyright__ = "Copyright (c) 2025 PySATL project" +__license__ = "SPDX-License-Identifier: MIT" + +from rework_pysatl_mpest.preprocessing.components_family.mixture_classifiers.base_xgb.base_xgb import ( + XGBBaseModel, +) +from rework_pysatl_mpest.preprocessing.components_family.mixture_classifiers.mixture_classifier import ( + MixtureClassifierModel, +) diff --git a/rework_pysatl_mpest/preprocessing/components_family/mixture_classifiers/base_xgb/base_xgb.py b/rework_pysatl_mpest/preprocessing/components_family/mixture_classifiers/base_xgb/base_xgb.py new file mode 100644 index 0000000..eab4e27 --- /dev/null +++ b/rework_pysatl_mpest/preprocessing/components_family/mixture_classifiers/base_xgb/base_xgb.py @@ -0,0 +1,34 @@ +"""Module which contains XGB-base mixture classifier model""" + +__author__ = "Mark Dubrovchenko" +__copyright__ = "Copyright (c) 2025 PySATL project" +__license__ = "SPDX-License-Identifier: MIT" + +from pathlib import Path + +from rework_pysatl_mpest.distributions import Beta, Cauchy, Exponential, Normal, Uniform, Weibull +from rework_pysatl_mpest.preprocessing.components_family.classifier_criterions import ( + MixtureClassifierCriterions, +) +from rework_pysatl_mpest.preprocessing.components_family.classifier_models.classifier_models import ( + XGBClassifier, +) +from rework_pysatl_mpest.preprocessing.components_family.mixture_classifiers.mixture_classifier import ( + MixtureClassifierModel, +) + +XGBBaseModel = MixtureClassifierModel( + XGBClassifier(), + "https://drive.google.com/uc?id=1dNWfD7rRcCLawt9rJHDfCaPV7piE6jFB", + str(Path(__file__).parent / "xgb_model.ubj"), + str(Path(__file__).parent / "labels.csv"), + MixtureClassifierCriterions(), + { + "G": Normal(0.0, 1.0), + "W": Weibull(1.0, 0.0, 1.0), + "U": Uniform(0.0, 1.0), + "C": Cauchy(0.0, 1.0), + "E": Exponential(0.0, 1.0), + "B": Beta(0.0, 0.0, 0.0, 1.0), + }, +) diff --git a/rework_pysatl_mpest/preprocessing/components_family/mixture_classifiers/base_xgb/labels.csv b/rework_pysatl_mpest/preprocessing/components_family/mixture_classifiers/base_xgb/labels.csv new file mode 100644 index 0000000..cce5b36 --- /dev/null +++ b/rework_pysatl_mpest/preprocessing/components_family/mixture_classifiers/base_xgb/labels.csv @@ -0,0 +1,106 @@ +Labels +B +B B +B B B +B B B B +B B B B B +B B B B B B +B B B B B B B +B B B B B B B B +B B B B B B B B B +B B B B B B B B B B +B B C +B B U +B C +B C C +B U +B U U +C +C C +C C C +C C C C +C C C C C +C C C C C C +C C C C C C C +C C C C C C C C +C C C C C C C C C +C C C C C C C C C C +E +E B +E B B +E C +E C C +E E +E E B +E E C +E E E +E E E E +E E E E E +E E E E E E +E E E E E E E +E E E E E E E E +E E E E E E E E E +E E E E E E E E E E +E E U +E U +E U U +G +G B +G B B +G C +G C C +G E +G E E +G G +G G B +G G C +G G E +G G G +G G G G +G G G G G +G G G G G G +G G G G G G G +G G G G G G G G +G G G G G G G G G +G G G G G G G G G G +G G U +G G W +G U +G U U +G W +G W W +U +U C +U C C +U U +U U C +U U U +U U U U +U U U U U +U U U U U U +U U U U U U U +U U U U U U U U +U U U U U U U U U +U U U U U U U U U U +W +W B +W B B +W C +W C C +W E +W E E +W U +W U U +W W +W W B +W W C +W W E +W W U +W W W +W W W W +W W W W W +W W W W W W +W W W W W W W +W W W W W W W W +W W W W W W W W W +W W W W W W W W W W diff --git a/rework_pysatl_mpest/preprocessing/components_family/mixture_classifiers/mixture_classifier.py b/rework_pysatl_mpest/preprocessing/components_family/mixture_classifiers/mixture_classifier.py new file mode 100644 index 0000000..2f649ad --- /dev/null +++ b/rework_pysatl_mpest/preprocessing/components_family/mixture_classifiers/mixture_classifier.py @@ -0,0 +1,81 @@ +"""Module which contains mixture classifier template""" + +__author__ = "Mark Dubrovchenko" +__copyright__ = "Copyright (c) 2025 PySATL project" +__license__ = "SPDX-License-Identifier: MIT" + +import os + +import gdown +import numpy as np +import pandas as pd +from rework_pysatl_mpest.distributions import ContinuousDistribution +from rework_pysatl_mpest.preprocessing.components_family.classifier_criterions import ( + MixtureClassifierCriterions, +) +from rework_pysatl_mpest.preprocessing.components_family.classifier_models import ( + IClassifier, +) +from sklearn.preprocessing import LabelEncoder + + +class MixtureClassifierModel: + """ + MixtureClassifierCriterions + + Parameters + ---------- + :model: IClassifier — Classifier Model + :model_path: str — Path to model folder + :label_path: str — Path to label folder + :criterions: MixtureClassifierCriterions — Mixture Classifier Criterions + :distributions: dict[str, ContinuousDistribution] — Dictionary of used distributions + """ + + def __init__( + self, + model: IClassifier, + model_link: str | None, + model_path: str, + labels_path: str, + criterions: MixtureClassifierCriterions, + distributions: dict[str, ContinuousDistribution], + ) -> None: + self.model = model + self.model_link = model_link + self.model_path = model_path + + self.le = LabelEncoder() + self.labels_path = labels_path + + self.criterions = criterions + self.distributions = distributions + + def _download_model(self) -> None: + """Function for installing a model from Google Drive if it is not downloaded""" + + if not os.path.exists(self.model_path): + if not self.model_link: + raise FileNotFoundError("The model file was not found") + + gdown.download(self.model_link, self.model_path, quiet=False) + + def predict(self, X: np.ndarray) -> np.ndarray: + """Function for obtaining an unlabeled model prediction""" + + if not self.model.is_fitted: + self._download_model() + self.model.load_model(self.model_path) + + criterions = self.criterions.get_criterions(X) + return self.model.predict(criterions)[0] + + def transform(self, feature_id: int) -> list[ContinuousDistribution]: + """Function for converting a model prediction into an appropriate format""" + + if not hasattr(self.le, "classes_"): + labels = pd.read_csv(self.labels_path)["Labels"] + self.le.fit(labels) + + label = self.le.inverse_transform([feature_id])[0] + return [self.distributions[d] for d in label.split(" ")] diff --git a/rework_pysatl_mpest/preprocessing/components_number/__init__.py b/rework_pysatl_mpest/preprocessing/components_number/__init__.py new file mode 100644 index 0000000..32f0511 --- /dev/null +++ b/rework_pysatl_mpest/preprocessing/components_number/__init__.py @@ -0,0 +1,12 @@ +"""Module which represents method estimating components number and abstract classes""" + +__author__ = "Mark Dubrovchenko" +__copyright__ = "Copyright (c) 2025 PySATL project" +__license__ = "SPDX-License-Identifier: MIT" + +from rework_pysatl_mpest.preprocessing.components_number.abstract_estimator import AComponentsNumber +from rework_pysatl_mpest.preprocessing.components_number.elbow import Elbow +from rework_pysatl_mpest.preprocessing.components_number.peaks import Peaks +from rework_pysatl_mpest.preprocessing.components_number.silhouette import Silhouette + +__all__ = ["AComponentsNumber", "Elbow", "Peaks", "Silhouette"] diff --git a/rework_pysatl_mpest/preprocessing/components_number/abstract_estimator.py b/rework_pysatl_mpest/preprocessing/components_number/abstract_estimator.py new file mode 100644 index 0000000..393d38d --- /dev/null +++ b/rework_pysatl_mpest/preprocessing/components_number/abstract_estimator.py @@ -0,0 +1,22 @@ +"""Module which contains abstract class for methods estimating number of components in mixture""" + +__author__ = "Mark Dubrovchenko" +__copyright__ = "Copyright (c) 2025 PySATL project" +__license__ = "SPDX-License-Identifier: MIT" + +from abc import ABC, abstractmethod + +import numpy as np + + +class AComponentsNumber(ABC): + """Abstract class for methods estimating number of components in mixture""" + + @property + @abstractmethod + def name(self) -> str: + """Name getter""" + + @abstractmethod + def estimate(self, X: np.ndarray) -> int: + """The function for estimating number of components""" diff --git a/rework_pysatl_mpest/preprocessing/components_number/elbow.py b/rework_pysatl_mpest/preprocessing/components_number/elbow.py new file mode 100644 index 0000000..719ec63 --- /dev/null +++ b/rework_pysatl_mpest/preprocessing/components_number/elbow.py @@ -0,0 +1,58 @@ +"""Module which contains Elbow Method""" + +__author__ = "Mark Dubrovchenko" +__copyright__ = "Copyright (c) 2025 PySATL project" +__license__ = "SPDX-License-Identifier: MIT" + +import numpy as np +from kneed import KneeLocator +from rework_pysatl_mpest.preprocessing.components_number.abstract_estimator import AComponentsNumber +from sklearn.cluster import KMeans + + +class Elbow(AComponentsNumber): + """ + Elbow method with KMeans++ + + Parameters + ----- + :kmax: int — Assumed maximum number of components + :k_init: int default: 1 — Number of times the KMeans is run + :k_max_iter: int default: 300 — Maximum number of iterations in KMeans + :random_state: int | None default: None — Determines random generation for KMeans + """ + + def __init__( + self, + kmax: int, + k_init: int = 1, + k_max_iter: int = 300, + random_state: int | None = None, + ) -> None: + self.kmax = kmax + self.k_init = k_init + self.k_max_iter = k_max_iter + self.random_state = random_state + + @property + def name(self) -> str: + return "Elbow" + + def estimate(self, X: np.ndarray) -> int: + X = X.reshape(-1, 1) + k_range = range(1, self.kmax + 2) + wcss = [] + + for k in k_range: + kmeans_elbow = KMeans( + max_iter=self.k_max_iter, + n_clusters=k, + init="k-means++", + n_init=self.k_init, + random_state=self.random_state, + ).fit(X) + wcss.append(kmeans_elbow.inertia_) + + knee = KneeLocator(k_range, wcss, curve="convex", direction="decreasing").elbow + + return knee diff --git a/rework_pysatl_mpest/preprocessing/components_number/peaks.py b/rework_pysatl_mpest/preprocessing/components_number/peaks.py new file mode 100644 index 0000000..97a3426 --- /dev/null +++ b/rework_pysatl_mpest/preprocessing/components_number/peaks.py @@ -0,0 +1,41 @@ +"""Module which contains Peaks Method""" + +__author__ = "Mark Dubrovchenko" +__copyright__ = "Copyright (c) 2025 PySATL project" +__license__ = "SPDX-License-Identifier: MIT" + +from math import ceil + +import numpy as np +from rework_pysatl_mpest.preprocessing.components_number.abstract_estimator import AComponentsNumber +from scipy.signal import find_peaks + + +class Peaks(AComponentsNumber): + """Peaks Method with Empirical Density""" + + @property + def name(self) -> str: + return "Peaks" + + def estimate(self, X: np.ndarray) -> int: + """ + Doanes fromula to determinate numbers of bins + # nbins = 1 + log2(n) + log2(1 + |skewness| / sg1) + # sg1 = √(6.0 * (n - 2.0) / ((n + 1.0) * (n + 3.0))) + """ + + n = X.size + sg1 = np.sqrt(6.0 * (n - 2.0) / ((n + 1.0) * (n + 3.0))) + skew = np.mean(((X - np.mean(X)) / np.std(X)) ** 3) + + nbins = ceil(1 + np.log2(n) + np.log2(1 + abs(skew) / sg1)) + + # Emperical Density + hist = np.histogram(X, bins=nbins, density=True)[0] + hist = np.concatenate((np.zeros(1), hist, np.zeros(1))) + + # Peaks counting + peaks, _ = find_peaks(hist) + peaks_count = len(peaks) + return peaks_count diff --git a/rework_pysatl_mpest/preprocessing/components_number/silhouette.py b/rework_pysatl_mpest/preprocessing/components_number/silhouette.py new file mode 100644 index 0000000..af4617d --- /dev/null +++ b/rework_pysatl_mpest/preprocessing/components_number/silhouette.py @@ -0,0 +1,58 @@ +"""Module which contains Silhouette Method""" + +__author__ = "Mark Dubrovchenko" +__copyright__ = "Copyright (c) 2025 PySATL project" +__license__ = "SPDX-License-Identifier: MIT" + +import numpy as np +from rework_pysatl_mpest.preprocessing.components_number.abstract_estimator import AComponentsNumber +from sklearn.cluster import KMeans +from sklearn.metrics import silhouette_score + + +class Silhouette(AComponentsNumber): + """ + Silhouette method with KMeans++ + + Parameters + ---------- + :kmax: int — Assumed maximum number of components + :k_init: int default: 1 — Number of times the KMeans is run + :k_max_iter: int default: 300 — Maximum number of iterations in KMeans + :random_state: int | None default: None — Determines random generation for KMeans + """ + + def __init__( + self, + kmax: int, + k_init: int = 1, + k_max_iter: int = 300, + random_state: int | None = None, + ) -> None: + self.kmax = kmax + self.k_init = k_init + self.k_max_iter = k_max_iter + self.random_state = random_state + + @property + def name(self) -> str: + return "Silhouette" + + def estimate(self, X: np.ndarray) -> int: + X = X.reshape(-1, 1) + k_range = range(2, self.kmax + 1) # possible components: [2, kmax] + silhouettes = [] + + for k in k_range: + kmeans_silhouette = KMeans( + n_clusters=k, + max_iter=self.k_max_iter, + init="k-means++", + n_init=self.k_init, + random_state=self.random_state, + ).fit(X) + silhouettes.append(silhouette_score(X, kmeans_silhouette.labels_)) + + optimal_k = silhouettes.index(max(silhouettes)) + 2 + + return optimal_k diff --git a/rework_tests/unit/preprocessing/components_family/criterions/test_frequency_criterions.py b/rework_tests/unit/preprocessing/components_family/criterions/test_frequency_criterions.py new file mode 100644 index 0000000..8e5b92c --- /dev/null +++ b/rework_tests/unit/preprocessing/components_family/criterions/test_frequency_criterions.py @@ -0,0 +1,252 @@ +import numpy as np +import pytest +from rework_pysatl_mpest.preprocessing.components_family.criterions.frequency_criterions import ( + CDct, + CDctEnergy, + CSpecBandwidth, + CSpecCentroid, + CSpecDecrease, + CSpecEnergy, + CSpecEntropy, + CSpecFlatness, + CSpecRolloff, + CSpecSlope, + CWaveletEnergy, + CWaveletEntropy, + CWaveletLarge, + CWaveletMean, + CWaveletStd, +) + + +class TestFrequencyCriterions: + @pytest.mark.parametrize( + "dct_type,hist,expected", + [ + (1, [1, 1, 1, 1, 1], 0), + (1, [1, 2, 3, 4, 5], -0.20996), + (1, [1, 2, 4, 3, 0], 0.02297), + (2, [1, 1, 1, 1, 1], 0), + (2, [1, 2, 3, 4, 5], 0), + (2, [1, 2, 4, 3, 0], -0.29953), + (3, [1, 1, 1, 1, 1], 0), + (3, [1, 2, 3, 4, 5], -0.01893), + (3, [1, 2, 4, 3, 0], 0.09732), + ], + ) + def test_cdct(self, dct_type, hist, expected): + error_rate = 1e-5 + result = CDct(dct_type).score(hist) + assert np.abs(expected - result) < error_rate + + @pytest.mark.parametrize( + "hist,expected", + [ + ([1, 1, 1, 1, 1], 0), + ([1, 2, 3, 4, 5], 0), + ([1, 2, 4, 3, 0], 0.00027), + ], + ) + def test_cdct_energy(self, hist, expected): + error_rate = 1e-5 + result = CDctEnergy().score(hist) + assert np.abs(expected - result) < error_rate + + @pytest.mark.parametrize( + "hist,expected", + [ + ([1, 1, 1, 1, 1], 0), + ([1, 2, 3, 4, 5], 0.69712), + ([1, 2, 4, 3, 0], 0.66418), + ], + ) + def test_spec_bandwidth(self, hist, expected): + error_rate = 1e-5 + result = CSpecBandwidth().score(hist) + assert np.abs(expected - result) < error_rate + + @pytest.mark.parametrize( + "hist,expected", + [ + ([1, 1, 1, 1, 1], 0), + ([1, 2, 3, 4, 5], 0.43463), + ([1, 2, 4, 3, 0], 0.48264), + ], + ) + def test_spec_centroid(self, hist, expected): + error_rate = 1e-5 + result = CSpecCentroid().score(hist) + assert np.abs(expected - result) < error_rate + + @pytest.mark.parametrize( + "hist,expected", + [ + ([1, 1, 1, 1, 1], 0.00002), + ([1, 2, 3, 4, 5], -0.23606), + ([1, 2, 4, 3, 0], -0.50523), + ], + ) + def test_spec_decrease(self, hist, expected): + error_rate = 1e-5 + result = CSpecDecrease().score(hist) + assert np.abs(expected - result) < error_rate + + @pytest.mark.parametrize( + "hist,expected", + [ + ([1, 1, 1, 1, 1], 1), + ([1, 2, 3, 4, 5], 1.11111), + ([1, 2, 4, 3, 0], 1.24999), + ], + ) + def test_spec_energy(self, hist, expected): + error_rate = 1e-5 + result = CSpecEnergy().score(hist) + assert np.abs(expected - result) < error_rate + + @pytest.mark.parametrize( + "hist,expected", + [ + ([1, 1, 1, 1, 1], 0), + ([1, 2, 3, 4, 5], 0.85048), + ([1, 2, 4, 3, 0], 0.46104), + ], + ) + def test_spec_entropy(self, hist, expected): + error_rate = 1e-5 + result = CSpecEntropy().score(hist) + assert np.abs(expected - result) < error_rate + + @pytest.mark.parametrize( + "hist,expected", + [ + ([1, 1, 1, 1, 1], 0), + ([1, 2, 3, 4, 5], 0.75605), + ([1, 2, 4, 3, 0], 0.77272), + ], + ) + def test_spec_flatness(self, hist, expected): + error_rate = 1e-5 + result = CSpecFlatness().score(hist) + assert np.abs(expected - result) < error_rate + + @pytest.mark.parametrize( + "hist,expected", + [ + ([1, 1, 1, 1, 1], 0), + ([1, 2, 3, 4, 5], 1), + ([1, 2, 4, 3, 0], 1), + ], + ) + def test_spec_rolloff(self, hist, expected): + error_rate = 1e-5 + result = CSpecRolloff().score(hist) + assert np.abs(expected - result) < error_rate + + @pytest.mark.parametrize( + "hist,expected", + [ + ([1, 1, 1, 1, 1], -0.49999), + ([1, 2, 3, 4, 5], -0.41237), + ([1, 2, 4, 3, 0], -0.42193), + ], + ) + def test_spec_slope(self, hist, expected): + error_rate = 1e-5 + result = CSpecSlope().score(hist) + assert np.abs(expected - result) < error_rate + + @pytest.mark.parametrize( + "level,hist,expected", + [ + (1, [1, 1, 1, 1, 1], 1), + (1, [1, 2, 3, 4, 5], 0.86538), + (1, [1, 2, 4, 3, 0], 0.41666), + (2, [1, 1, 1, 1, 1], 0), + (2, [1, 2, 3, 4, 5], 0.09615), + (2, [1, 2, 4, 3, 0], 0.41666), + (3, [1, 1, 1, 1, 1], 0), + (3, [1, 2, 3, 4, 5], 0.03076), + (3, [1, 2, 4, 3, 0], 0.13333), + ], + ) + def test_wavelet_energy(self, level, hist, expected): + error_rate = 1e-5 + result = CWaveletEnergy(level=level).score(hist) + assert np.abs(expected - result) < error_rate + + @pytest.mark.parametrize( + "level,hist,expected", + [ + (1, [1, 1, 1, 1, 1], 0), + (1, [1, 2, 3, 4, 5], 0), + (1, [1, 2, 4, 3, 0], 0), + (2, [1, 1, 1, 1, 1], 0), + (2, [1, 2, 3, 4, 5], 0), + (2, [1, 2, 4, 3, 0], 0), + (3, [1, 1, 1, 1, 1], 0), + (3, [1, 2, 3, 4, 5], 0), + (3, [1, 2, 4, 3, 0], 0), + ], + ) + def test_wavelet_entropy(self, level, hist, expected): + error_rate = 1e-5 + result = CWaveletEntropy(level=level).score(hist) + assert np.abs(expected - result) < error_rate + + @pytest.mark.parametrize( + "level,hist,expected", + [ + (1, [1, 1, 1, 1, 1], 0.56568), + (1, [1, 2, 3, 4, 5], 0.70710), + (1, [1, 2, 4, 3, 0], 0.35355), + (2, [1, 1, 1, 1, 1], 0), + (2, [1, 2, 3, 4, 5], -0.23570), + (2, [1, 2, 4, 3, 0], 0.35355), + (3, [1, 1, 1, 1, 1], 0), + (3, [1, 2, 3, 4, 5], -0.06666), + (3, [1, 2, 4, 3, 0], -0.1), + ], + ) + def test_wavelet_mean(self, level, hist, expected): + error_rate = 1e-5 + result = CWaveletMean(level=level).score(hist) + assert np.abs(expected - result) < error_rate + + @pytest.mark.parametrize( + "level,hist,expected", + [ + (1, [1, 1, 1, 1, 1], 0), + (1, [1, 2, 3, 4, 5], 0), + (1, [1, 2, 4, 3, 0], 0), + (2, [1, 1, 1, 1, 1], 0), + (2, [1, 2, 3, 4, 5], 0), + (2, [1, 2, 4, 3, 0], 0), + (3, [1, 1, 1, 1, 1], 0), + (3, [1, 2, 3, 4, 5], 0.06666), + (3, [1, 2, 4, 3, 0], 0.1), + ], + ) + def test_wavelet_std(self, level, hist, expected): + error_rate = 1e-5 + result = CWaveletStd(level=level).score(hist) + assert np.abs(expected - result) < error_rate + + @pytest.mark.parametrize( + "level,hist,expected", + [ + (1, [1, 1, 1, 1, 1], 1), + (1, [1, 2, 3, 4, 5], 1), + (1, [1, 2, 4, 3, 0], 1), + (2, [1, 1, 1, 1, 1], 0), + (2, [1, 2, 3, 4, 5], 1), + (2, [1, 2, 4, 3, 0], 1), + (3, [1, 1, 1, 1, 1], 0), + (3, [1, 2, 3, 4, 5], 0.5), + (3, [1, 2, 4, 3, 0], 0.5), + ], + ) + def test_wavelet_large(self, level, hist, expected): + error_rate = 1e-5 + result = CWaveletLarge(level=level).score(hist) + assert np.abs(expected - result) < error_rate diff --git a/rework_tests/unit/preprocessing/components_family/criterions/test_hist_criterions.py b/rework_tests/unit/preprocessing/components_family/criterions/test_hist_criterions.py new file mode 100644 index 0000000..7ef657a --- /dev/null +++ b/rework_tests/unit/preprocessing/components_family/criterions/test_hist_criterions.py @@ -0,0 +1,132 @@ +import numpy as np +import pytest +from rework_pysatl_mpest.preprocessing.components_family.criterions.hist_criterions import ( + CHistEnergy, + CHistEntropy, + CHistFlat, + CHistLength, + CHistUniform, + CSobelCount, + CSobelMax, + CSobelMean, + CSobelMin, +) + + +class TestHistCriterions: + @pytest.mark.parametrize( + "hist,expected", + [ + ([1, 1, 1, 1, 1], 0.2), + ([1, 2, 3, 4, 5], 0.24444), + ([1, 2, 4, 3, 0], 0.3), + ], + ) + def test_hist_energy(self, hist, expected): + error_rate = 1e-5 + result = CHistEnergy().score(hist) + assert np.abs(expected - result) < error_rate + + @pytest.mark.parametrize( + "hist,expected", + [ + ([1, 1, 1, 1, 1], 2.32192), + ([1, 2, 3, 4, 5], 2.14925), + ([1, 2, 4, 3, 0], 1.84643), + ], + ) + def test_hist_entropy(self, hist, expected): + error_rate = 1e-5 + result = CHistEntropy().score(hist) + assert np.abs(expected - result) < error_rate + + @pytest.mark.parametrize( + "hist,expected", + [ + ([1, 1, 1, 1, 1], 1), + ([1, 2, 3, 4, 5], 0), + ([1, 2, 4, 3, 0], 0), + ], + ) + def test_hist_flat(self, hist, expected): + error_rate = 1e-5 + result = CHistFlat().score(hist) + assert np.abs(expected - result) < error_rate + + @pytest.mark.parametrize( + "hist,expected", + [ + ([1, 1, 1, 1, 1], 0), + ([1, 2, 3, 4, 5], 0.26666), + ([1, 2, 4, 3, 0], 0.7), + ], + ) + def test_hist_length(self, hist, expected): + error_rate = 1e-5 + result = CHistLength().score(hist) + assert np.abs(expected - result) < error_rate + + @pytest.mark.parametrize( + "hist,expected", + [ + ([1, 1, 1, 1, 1], 0), + ([1, 2, 3, 4, 5], 0.17778), + ([1, 2, 4, 3, 0], 0.31596), + ], + ) + def test_hist_uniform(self, hist, expected): + error_rate = 1e-5 + result = CHistUniform().score(hist) + assert np.abs(expected - result) < error_rate + + @pytest.mark.parametrize( + "hist,expected", + [ + ([1, 1, 1, 1, 1], 0), + ([1, 2, 3, 4, 5], 1), + ([1, 2, 4, 3, 0], 1), + ], + ) + def test_sobel_count(self, hist, expected): + error_rate = 1e-5 + result = CSobelCount().score(hist) + assert np.abs(expected - result) < error_rate + + @pytest.mark.parametrize( + "hist,expected", + [ + ([1, 1, 1, 1, 1], 0), + ([1, 2, 3, 4, 5], 0.13333), + ([1, 2, 4, 3, 0], 0.4), + ], + ) + def test_sobel_max(self, hist, expected): + error_rate = 1e-5 + result = CSobelMax().score(hist) + assert np.abs(expected - result) < error_rate + + @pytest.mark.parametrize( + "hist,expected", + [ + ([1, 1, 1, 1, 1], 0), + ([1, 2, 3, 4, 5], 0.06666), + ([1, 2, 4, 3, 0], 0.09999), + ], + ) + def test_sobel_min(self, hist, expected): + error_rate = 1e-5 + result = CSobelMin().score(hist) + assert np.abs(expected - result) < error_rate + + @pytest.mark.parametrize( + "hist,expected", + [ + ([1, 1, 1, 1, 1], 0), + ([1, 2, 3, 4, 5], 0.10666), + ([1, 2, 4, 3, 0], 0.24), + ], + ) + def test_sobel_mean(self, hist, expected): + error_rate = 1e-5 + result = CSobelMean().score(hist) + assert np.abs(expected - result) < error_rate diff --git a/rework_tests/unit/preprocessing/components_family/criterions/test_peaks_criterions.py b/rework_tests/unit/preprocessing/components_family/criterions/test_peaks_criterions.py new file mode 100644 index 0000000..80e0eef --- /dev/null +++ b/rework_tests/unit/preprocessing/components_family/criterions/test_peaks_criterions.py @@ -0,0 +1,285 @@ +import numpy as np +import pytest +from rework_pysatl_mpest.preprocessing.components_family.criterions.peaks_criterions import ( + CPeaksCount, + CPeaksDistMax, + CPeaksDistMean, + CPeaksDistMin, + CPeaksFirst, + CPeaksLast, + CPeaksMax, + CPeaksMean, + CPeaksMin, + CPeaksWidthMax, + CPeaksWidthMean, + CPeaksWidthMin, + CValleysDistMax, + CValleysDistMean, + CValleysDistMin, + CValleysMax, + CValleysMean, + CValleysMin, + CValleysWidthMax, + CValleysWidthMean, + CValleysWidthMin, +) + + +class TestPeaksCriterions: + @pytest.mark.parametrize( + "hist,expected", + [ + ([1, 1, 1, 1, 1, 1, 1, 1], 1), + ([1, 2, 0, 4, 3, 3, 4, 2], 3), + ([1, 2, 0, 0, 1, 3, 4, 2], 2), + ], + ) + def test_peaks_count(self, hist, expected): + result = CPeaksCount().score(hist) + assert expected == result + + @pytest.mark.parametrize( + "hist,expected", + [ + ([1, 1, 1, 1, 1, 1, 1, 1], 0), + ([1, 2, 0, 4, 3, 3, 4, 2], 0), + ([2, 1, 0, 0, 1, 3, 4, 2], 1), + ], + ) + def test_peaks_first(self, hist, expected): + result = CPeaksFirst().score(hist) + assert expected == result + + @pytest.mark.parametrize( + "hist,expected", + [ + ([1, 1, 1, 1, 1, 1, 1, 1], 0), + ([1, 2, 0, 4, 3, 3, 4, 2], 0), + ([1, 2, 0, 0, 1, 3, 4, 5], 1), + ], + ) + def test_peaks_last(self, hist, expected): + result = CPeaksLast().score(hist) + assert expected == result + + @pytest.mark.parametrize( + "hist,expected", + [ + ([1, 1, 1, 1, 1, 1, 1, 1], 1), + ([1, 2, 0, 4, 3, 3, 4, 2], 0.5625), + ([1, 2, 0, 0, 1, 3, 4, 2], 0.3125), + ], + ) + def test_peaks_width_max(self, hist, expected): + result = CPeaksWidthMax().score(hist) + assert expected == result + + @pytest.mark.parametrize( + "hist,expected", + [ + ([1, 1, 1, 1, 1, 1, 1, 1], 1), + ([1, 2, 0, 4, 3, 3, 4, 2], 0.4375), + ([1, 2, 0, 0, 1, 3, 4, 2], 0.25), + ], + ) + def test_peaks_width_mean(self, hist, expected): + result = CPeaksWidthMean().score(hist) + assert expected == result + + @pytest.mark.parametrize( + "hist,expected", + [ + ([1, 1, 1, 1, 1, 1, 1, 1], 1), + ([1, 2, 0, 4, 3, 3, 4, 2], 0.1875), + ([1, 2, 0, 0, 1, 3, 4, 2], 0.1875), + ], + ) + def test_peaks_width_min(self, hist, expected): + result = CPeaksWidthMin().score(hist) + assert expected == result + + @pytest.mark.parametrize( + "hist,expected", + [ + ([1, 1, 1, 1, 1, 1, 1, 1], 0), + ([1, 2, 0, 4, 3, 3, 4, 2], 0.25), + ([1, 2, 0, 0, 1, 3, 4, 2], 0.3125), + ], + ) + def test_valleys_width_max(self, hist, expected): + result = CValleysWidthMax().score(hist) + assert expected == result + + @pytest.mark.parametrize( + "hist,expected", + [ + ([1, 1, 1, 1, 1, 1, 1, 1], 0), + ([1, 2, 0, 4, 3, 3, 4, 2], 0.171875), + ([1, 2, 0, 0, 1, 3, 4, 2], 0.3125), + ], + ) + def test_valleys_width_mean(self, hist, expected): + result = CValleysWidthMean().score(hist) + assert expected == result + + @pytest.mark.parametrize( + "hist,expected", + [ + ([1, 1, 1, 1, 1, 1, 1, 1], 0), + ([1, 2, 0, 4, 3, 3, 4, 2], 0.09375), + ([1, 2, 0, 0, 1, 3, 4, 2], 0.3125), + ], + ) + def test_valleys_width_min(self, hist, expected): + result = CValleysWidthMin().score(hist) + assert expected == result + + @pytest.mark.parametrize( + "hist,expected", + [ + ([1, 1, 1, 1, 1, 1, 1, 1], 0), + ([1, 2, 0, 4, 3, 3, 4, 2], 0.25), + ([1, 2, 0, 0, 1, 3, 4, 2], 0.5), + ], + ) + def test_peaks_dist_max(self, hist, expected): + result = CPeaksDistMax().score(hist) + assert expected == result + + @pytest.mark.parametrize( + "hist,expected", + [ + ([1, 1, 1, 1, 1, 1, 1, 1], 0), + ([1, 2, 0, 4, 3, 3, 4, 2], 0.1875), + ([1, 2, 0, 0, 1, 3, 4, 2], 0.5), + ], + ) + def test_peaks_dist_mean(self, hist, expected): + result = CPeaksDistMean().score(hist) + assert expected == result + + @pytest.mark.parametrize( + "hist,expected", + [ + ([1, 1, 1, 1, 1, 1, 1, 1], 0), + ([1, 2, 0, 4, 3, 3, 4, 2], 0.125), + ([1, 2, 0, 0, 1, 3, 4, 2], 0.5), + ], + ) + def test_peaks_dist_min(self, hist, expected): + result = CPeaksDistMin().score(hist) + assert expected == result + + @pytest.mark.parametrize( + "hist,expected", + [ + ([1, 1, 1, 1, 1, 1, 1, 1], 0), + ([1, 2, 0, 4, 3, 3, 4, 2], 0.125), + ([1, 2, 0, 0, 1, 3, 4, 2], 0), + ], + ) + def test_valleys_dist_max(self, hist, expected): + result = CValleysDistMax().score(hist) + assert expected == result + + @pytest.mark.parametrize( + "hist,expected", + [ + ([1, 1, 1, 1, 1, 1, 1, 1], 0), + ([1, 2, 0, 4, 3, 3, 4, 2], 0.125), + ([1, 2, 0, 0, 1, 3, 4, 2], 0), + ], + ) + def test_valleys_dist_mean(self, hist, expected): + result = CValleysDistMean().score(hist) + assert expected == result + + @pytest.mark.parametrize( + "hist,expected", + [ + ([1, 1, 1, 1, 1, 1, 1, 1], 0), + ([1, 2, 0, 4, 3, 3, 4, 2], 0.125), + ([1, 2, 0, 0, 1, 3, 4, 2], 0), + ], + ) + def test_valleys_dist_min(self, hist, expected): + result = CValleysDistMin().score(hist) + assert expected == result + + @pytest.mark.parametrize( + "hist,expected", + [ + ([1, 1, 1, 1, 1, 1, 1, 1], 0.125), + ([1, 2, 0, 4, 3, 3, 4, 2], 0.21052), + ([1, 2, 0, 0, 1, 3, 4, 2], 0.30769), + ], + ) + def test_peaks_max(self, hist, expected): + error_rate = 1e-5 + result = CPeaksMax().score(hist) + assert np.abs(expected - result) < error_rate + + @pytest.mark.parametrize( + "hist,expected", + [ + ([1, 1, 1, 1, 1, 1, 1, 1], 0.125), + ([1, 2, 0, 4, 3, 3, 4, 2], 0.17543), + ([1, 2, 0, 0, 1, 3, 4, 2], 0.23076), + ], + ) + def test_peaks_mean(self, hist, expected): + error_rate = 1e-5 + result = CPeaksMean().score(hist) + assert np.abs(expected - result) < error_rate + + @pytest.mark.parametrize( + "hist,expected", + [ + ([1, 1, 1, 1, 1, 1, 1, 1], 0.125), + ([1, 2, 0, 4, 3, 3, 4, 2], 0.10526), + ([1, 2, 0, 0, 1, 3, 4, 2], 0.15384), + ], + ) + def test_peaks_min(self, hist, expected): + error_rate = 1e-5 + result = CPeaksMin().score(hist) + assert np.abs(expected - result) < error_rate + + @pytest.mark.parametrize( + "hist,expected", + [ + ([1, 1, 1, 1, 1, 1, 1, 1], 0), + ([1, 2, 0, 4, 3, 3, 4, 2], 0.15789), + ([1, 2, 0, 0, 1, 3, 4, 2], 0), + ], + ) + def test_valleys_max(self, hist, expected): + error_rate = 1e-5 + result = CValleysMax().score(hist) + assert np.abs(expected - result) < error_rate + + @pytest.mark.parametrize( + "hist,expected", + [ + ([1, 1, 1, 1, 1, 1, 1, 1], 0), + ([1, 2, 0, 4, 3, 3, 4, 2], 0.07894), + ([1, 2, 0, 0, 1, 3, 4, 2], 0), + ], + ) + def test_valleys_mean(self, hist, expected): + error_rate = 1e-5 + result = CValleysMean().score(hist) + assert np.abs(expected - result) < error_rate + + @pytest.mark.parametrize( + "hist,expected", + [ + ([1, 1, 1, 1, 1, 1, 1, 1], 0), + ([1, 2, 0, 4, 3, 3, 4, 2], 0), + ([1, 2, 0, 0, 1, 3, 4, 2], 0), + ], + ) + def test_valleys_min(self, hist, expected): + error_rate = 1e-5 + result = CValleysMin().score(hist) + assert np.abs(expected - result) < error_rate diff --git a/rework_tests/unit/preprocessing/components_family/criterions/test_sample_criterions.py b/rework_tests/unit/preprocessing/components_family/criterions/test_sample_criterions.py new file mode 100644 index 0000000..e6dfc9b --- /dev/null +++ b/rework_tests/unit/preprocessing/components_family/criterions/test_sample_criterions.py @@ -0,0 +1,202 @@ +import numpy as np +import pytest +from rework_pysatl_mpest.preprocessing.components_family.criterions.sample_criterions import ( + CBootKurt, + CHillAbs, + CIqr, + CKurt, + CKurtMoors, + CLogRatio, + CMaxZscore, + CNegativeValue, + COutlierFraction, + CRange, + CSkew, + CSkewBowley, + CSpacingGap, + CSpacingGini, +) + + +class TestSampleCriterions: + @pytest.mark.parametrize( + "hist,expected", + [ + ([1, 1, 1, 1, 1], 0), + ([1, 2, 3, 4, 5], 0.31044), + ([1, 2, 4, 3, 0], 0.29186), + ], + ) + def test_boot_kurt(self, hist, expected): + error_rate = 1e-5 + result = CBootKurt(state=52).score(hist) + assert np.abs(expected - result) < error_rate + + @pytest.mark.parametrize( + "hist,expected", + [ + ([1, 1, 1, 1, 1], 0), + ([1, 2, 3, 4, 5], 0.39925), + ([1, 2, 4, 3, 0], 0.5493), + ], + ) + def test_hill_abs(self, hist, expected): + error_rate = 1e-5 + result = CHillAbs().score(hist) + assert np.abs(expected - result) < error_rate + + @pytest.mark.parametrize( + "hist,expected", + [ + ([1, 1, 1, 1, 1], 0), + ([1, 2, 3, 4, 5], 0), + ([-1, 2, 4, 3, 0], 1), + ], + ) + def test_negative_value(self, hist, expected): + error_rate = 1e-5 + result = CNegativeValue().score(hist) + assert np.abs(expected - result) < error_rate + + @pytest.mark.parametrize( + "hist,expected", + [ + ([1, 1, 1, 1, 1], 0), + ([1, 2, 3, 4, 5], -1.3), + ([1, 2, 4, 3, 0], -1.3), + ], + ) + def test_kurt(self, hist, expected): + error_rate = 1e-5 + result = CKurt().score(hist) + assert np.abs(expected - result) < error_rate + + @pytest.mark.parametrize( + "hist,expected", + [ + ([1, 1, 1, 1, 1], 0), + ([1, 2, 3, 4, 5], 0.66666), + ([1, 2, 4, 3, 0], 1), + ], + ) + def test_iqr(self, hist, expected): + error_rate = 1e-5 + result = CIqr().score(hist) + assert np.abs(expected - result) < error_rate + + @pytest.mark.parametrize( + "hist,expected", + [ + ([1, 1, 1, 1, 1], 0), + ([1, 2, 3, 4, 5], 0), + ([1, 2, 4, 3, 0], 0), + ], + ) + def test_log_ration(self, hist, expected): + error_rate = 1e-5 + result = CLogRatio().score(hist) + assert np.abs(expected - result) < error_rate + + @pytest.mark.parametrize( + "hist,expected", + [ + ([1, 1, 1, 1, 1], 0), + ([1, 2, 3, 4, 5], 0), + ([1, 2, 4, 3, 0], 0), + ], + ) + def test_outlier_fraction(self, hist, expected): + error_rate = 1e-5 + result = COutlierFraction().score(hist) + assert np.abs(expected - result) < error_rate + + @pytest.mark.parametrize( + "hist,expected", + [ + ([1, 1, 1, 1, 1], 0), + ([1, 2, 3, 4, 5], 4), + ([1, 2, 4, 3, 0], 4), + ], + ) + def test_range(self, hist, expected): + error_rate = 1e-5 + result = CRange().score(hist) + assert np.abs(expected - result) < error_rate + + @pytest.mark.parametrize( + "hist,expected", + [ + ([1, 1, 1, 1, 1], 0), + ([1, 2, 3, 4, 5], 0), + ([-1, 2, 4, 3, 0], -0.15798), + ], + ) + def test_skew(self, hist, expected): + error_rate = 1e-5 + result = CSkew().score(hist) + assert np.abs(expected - result) < error_rate + + @pytest.mark.parametrize( + "hist,expected", + [ + ([1, 1, 1, 1, 1], 0), + ([1, 2, 3, 4, 5], 0), + ([-1, 2, 4, 3, 0], -0.33333), + ], + ) + def test_skew_bowley(self, hist, expected): + error_rate = 1e-5 + result = CSkewBowley().score(hist) + assert np.abs(expected - result) < error_rate + + @pytest.mark.parametrize( + "hist,expected", + [ + ([1, 1, 1, 1, 1], 0), + ([1, 2, 3, 4, 5], 0), + ([1, 2, 4, 3, 0], 0), + ], + ) + def test_spacing_gap(self, hist, expected): + error_rate = 1e-5 + result = CSpacingGap().score(hist) + assert np.abs(expected - result) < error_rate + + @pytest.mark.parametrize( + "hist,expected", + [ + ([1, 1, 1, 1, 1], 0), + ([1, 2, 3, 4, 5], 0), + ([-1, 2, 4, 3, 0], 0.15), + ], + ) + def test_spacing_gini(self, hist, expected): + error_rate = 1e-5 + result = CSpacingGini().score(hist) + assert np.abs(expected - result) < error_rate + + @pytest.mark.parametrize( + "hist,expected", + [ + ([1, 1, 1, 1, 1], 0), + ([1, 2, 3, 4, 5], 1.41421), + ([1, 2, 4, 3, 0], 1.41421), + ], + ) + def test_zscore(self, hist, expected): + error_rate = 1e-5 + result = CMaxZscore().score(hist) + assert np.abs(expected - result) < error_rate + + @pytest.mark.parametrize( + "hist,expected", + [ + ([1, 1, 1, 1, 1], 0), + ([1, 2, 3, 4, 5], 2.66666), + ([1, 2, 4, 3, 0], 2.66666), + ], + ) + def test_kurt_moors(self, hist, expected): + error_rate = 1e-5 + result = CKurtMoors().score(hist) + assert np.abs(expected - result) < error_rate diff --git a/rework_tests/unit/preprocessing/components_number/components_num_utils.py b/rework_tests/unit/preprocessing/components_number/components_num_utils.py new file mode 100644 index 0000000..01cbe0a --- /dev/null +++ b/rework_tests/unit/preprocessing/components_number/components_num_utils.py @@ -0,0 +1,23 @@ +"""Module which contain utility for testing methods of estimating the number of components""" + +import numpy as np +from rework_pysatl_mpest.core import MixtureModel +from rework_pysatl_mpest.distributions import ContinuousDistribution +from rework_pysatl_mpest.preprocessing.components_number import AComponentsNumber + + +def run_test( + components: list[ContinuousDistribution], + weights: list[float], + size: int, + method: AComponentsNumber, +) -> int: + """Run a test scenario""" + + np.random.seed(42) + + mixture = MixtureModel(components=components, weights=weights) + + X = mixture.generate(size) + result = method.estimate(X) + return result diff --git a/rework_tests/unit/preprocessing/components_number/test_elbow.py b/rework_tests/unit/preprocessing/components_number/test_elbow.py new file mode 100644 index 0000000..f26b2e2 --- /dev/null +++ b/rework_tests/unit/preprocessing/components_number/test_elbow.py @@ -0,0 +1,62 @@ +"""Unit test module which test the Elbow method""" + +import pytest +from rework_pysatl_mpest.distributions import Exponential, Normal, Weibull +from rework_pysatl_mpest.preprocessing.components_number import Elbow +from rework_tests.unit.preprocessing.components_number.components_num_utils import run_test + + +@pytest.mark.parametrize( + "components, weights, size, kmax", + [ + ( + [Weibull(1.0, 0.0, 0.5), Normal(5.0, 1.0), Normal(15.0, 2.0)], + [0.33, 0.34, 0.33], + 200, + 15, + ), + ( + [Normal(5.0, 2.0), Normal(15.0, 2.0)], + [0.6, 0.4], + 500, + 15, + ), + ( + [Weibull(11.0, 0.0, 2.5), Normal(5.0, 3.0), Exponential(0.0, 0.25), Weibull(18.0, 0.0, 2.0)], + [0.2, 0.2, 0.4, 0.2], + 1000, + 20, + ), + ], +) +def test_correct_estimating(components, weights, size, kmax): + """Runs the Elbow method with a positive outcome""" + assert run_test(components, weights, size, Elbow(kmax, random_state=42)) == len(components) + + +@pytest.mark.parametrize( + "components, weights, size, kmax", + [ + ( + [Normal(5.0, 2.0), Normal(10.0, 2.0), Normal(15.0, 2.0)], + [0.6, 0.2, 0.2], + 200, + 20, + ), + ( + [Normal(5.0, 2.0), Weibull(7.0, 0.0, 3.0)], + [0.5, 0.5], + 500, + 15, + ), + ( + [Exponential(0.0, 0.5), Weibull(6.0, 0.0, 5.0), Weibull(7.0, 0.0, 5.0)], + [0.1, 0.3, 0.6], + 1000, + 15, + ), + ], +) +def test_incorrect_estimating(components, weights, size, kmax): + """Runs the Elbow method with a negative outcome""" + assert run_test(components, weights, size, Elbow(kmax, random_state=42)) != len(components) diff --git a/rework_tests/unit/preprocessing/components_number/test_peaks.py b/rework_tests/unit/preprocessing/components_number/test_peaks.py new file mode 100644 index 0000000..dc95019 --- /dev/null +++ b/rework_tests/unit/preprocessing/components_number/test_peaks.py @@ -0,0 +1,52 @@ +"""Unit test module which test the Peak method""" + +import pytest +from rework_pysatl_mpest.distributions import Exponential, Normal, Weibull +from rework_pysatl_mpest.preprocessing.components_number import Peaks +from rework_tests.unit.preprocessing.components_number.components_num_utils import run_test + + +@pytest.mark.parametrize( + "components, weights, size", + [ + ([Normal(5.0, 2.0)], [1.0], 200), + ( + [Weibull(5.0, 0.0, 2.0), Weibull(7.0, 0.0, 1.0), Weibull(11.0, 0.0, 3.0)], + [0.33, 0.33, 0.34], + 500, + ), + ( + [Weibull(4.0, 0.0, 2.0), Normal(7.5, 2.5), Weibull(10.0, 0.0, 4.0)], + [0.2, 0.4, 0.4], + 1000, + ), + ], +) +def test_correct_estimating(components, weights, size): + """Runs the Peak method with a positive outcome""" + assert run_test(components, weights, size, Peaks()) == len(components) + + +@pytest.mark.parametrize( + "components, weights, size", + [ + ( + [Weibull(10.0, 0.0, 1.0), Weibull(4.0, 0.0, 6.0), Exponential(0.0, 3.5)], + [0.2, 0.4, 0.4], + 200, + ), + ( + [Exponential(0.0, 0.5), Exponential(0.0, 3.5), Normal(9.0, 3.5), Normal(3.0, 6.0)], + [0.1, 0.2, 0.4, 0.3], + 5000, + ), + ( + [Normal(3.0, 1.5), Weibull(7.0, 0.0, 2.0)], + [0.7, 0.3], + 1000, + ), + ], +) +def test_incorrect_estimating(components, weights, size): + """Runs the Peak method with a negative outcome""" + assert run_test(components, weights, size, Peaks()) != len(components) diff --git a/rework_tests/unit/preprocessing/components_number/test_silhouette.py b/rework_tests/unit/preprocessing/components_number/test_silhouette.py new file mode 100644 index 0000000..b308ab5 --- /dev/null +++ b/rework_tests/unit/preprocessing/components_number/test_silhouette.py @@ -0,0 +1,57 @@ +"""Unit test module which test the Silhouette method""" + +import pytest +from rework_pysatl_mpest.distributions import Exponential, Normal, Weibull +from rework_pysatl_mpest.preprocessing.components_number import Silhouette +from rework_tests.unit.preprocessing.components_number.components_num_utils import run_test + + +@pytest.mark.parametrize( + "components, weights, size, kmax", + [ + ( + [Weibull(2.0, 0.0, 10.0), Normal(5.0, 1.0)], + [0.6, 0.4], + 200, + 10, + ), + ( + [Normal(-5.0, 3.0), Normal(2.0, 1.0), Normal(10.0, 2.0)], + [0.3, 0.3, 0.4], + 500, + 10, + ), + ( + [Exponential(0.0, 0.5), Normal(1.0, 3.0), Normal(3.0, 10.0), Normal(5.0, 1.0)], + [0.5, 0.3, 0.1, 0.1], + 500, + 10, + ), + ], +) +def test_correct_estimating(components, weights, size, kmax): + """Runs the Silhouette method with a positive outcome""" + assert run_test(components, weights, size, Silhouette(kmax, random_state=42)) == len(components) + + +@pytest.mark.parametrize( + "components, weights, size, kmax", + [ + ( + [Weibull(1.0, 0.0, 2.0), Weibull(5.0, 0.0, 1.0), Exponential(0.0, 1.0)], + [0.33, 0.33, 0.34], + 200, + 10, + ), + ([Exponential(0.0, 0.5)], [1.0], 500, 10), + ( + [Exponential(0.0, 0.5), Exponential(0.0, 3.0), Weibull(3.0, 0.0, 0.5)], + [0.4, 0.5, 0.1], + 1000, + 10, + ), + ], +) +def test_incorrect_estimating(components, weights, size, kmax): + """Runs the Silhouette method with a negative outcome""" + assert run_test(components, weights, size, Silhouette(kmax, random_state=42)) != len(components)