diff --git a/rework_pysatl_mpest/distributions/beta.py b/rework_pysatl_mpest/distributions/beta.py index a0eb41a..c71eee4 100644 --- a/rework_pysatl_mpest/distributions/beta.py +++ b/rework_pysatl_mpest/distributions/beta.py @@ -1,4 +1,4 @@ -"""Module providing four parametric beta distribution distribution class""" +"""Module providing four parametric beta distribution class""" __author__ = "Maksim Pastukhov, Aleksandra Ri" __copyright__ = "Copyright (c) 2025 PySATL project" diff --git a/rework_pysatl_mpest/estimators/iterative/_strategies/__init__.py b/rework_pysatl_mpest/estimators/iterative/_strategies/__init__.py index 08e03b8..0eccff2 100644 --- a/rework_pysatl_mpest/estimators/iterative/_strategies/__init__.py +++ b/rework_pysatl_mpest/estimators/iterative/_strategies/__init__.py @@ -12,11 +12,15 @@ from ....typings import DType from ..pipeline_state import PipelineState from ..steps import OptimizationBlock +from .moments import moments_strategy as _moments_strategy from .q_function import q_function_strategy as _q_function_strategy q_function_strategy: Callable[ [ContinuousDistribution, PipelineState, OptimizationBlock, Optimizer], tuple[int, dict[str, DType]] ] = _q_function_strategy +moments_strategy: Callable[ + [ContinuousDistribution, PipelineState, OptimizationBlock, Optimizer], tuple[int, dict[str, DType]] +] = _moments_strategy -__all__ = ["q_function_strategy"] +__all__ = ["moments_strategy", "q_function_strategy"] diff --git a/rework_pysatl_mpest/estimators/iterative/_strategies/moments.py b/rework_pysatl_mpest/estimators/iterative/_strategies/moments.py new file mode 100644 index 0000000..34ec710 --- /dev/null +++ b/rework_pysatl_mpest/estimators/iterative/_strategies/moments.py @@ -0,0 +1,156 @@ +"""Provides strategies for the Maximization-step based on the Method of Moments. + +This module implements the logic for updating component parameters by +equating theoretical moments of the distribution to the empirical weighted +moments of the data. It uses `functools.singledispatch` to provide a generic +interface that can be specialized for specific distribution types. +""" + +__author__ = "Aleksandra Ri" +__copyright__ = "Copyright (c) 2025 PySATL project" +__license__ = "SPDX-License-Identifier: MIT" + + +from functools import singledispatch + +import numpy as np + +from ....distributions import ContinuousDistribution, Exponential +from ....optimizers import Optimizer +from ....typings import DType +from ..pipeline_state import PipelineState +from ..steps import OptimizationBlock +from .utils import handle_numerical_overflow + +NUMERICAL_TOLERANCE = 1e-9 + + +# ------------------------ +# Base Moments strategy +# ------------------------ + + +@singledispatch +def moments_strategy( + component: ContinuousDistribution[DType], + state: PipelineState[DType], + block: OptimizationBlock, + optimizer: Optimizer, +) -> tuple[int, dict[str, DType]]: + """Generic M-step strategy that uses the Method of Moments. + + This function serves as the base dispatcher. Since the Method of Moments + typically requires analytical expressions for theoretical moments specific + to each distribution, a generic implementation is not provided and raises + NotImplementedError. + + Parameters + ---------- + component : ContinuousDistribution[DType] + The distribution component whose parameters are to be optimized. + state : PipelineState + The current state of the pipeline. + block : OptimizationBlock + The configuration block defining which component to optimize. + optimizer : Optimizer + The numerical optimizer (unused in this strategy). + + Raises + ------ + NotImplementedError + If a specialized moments strategy is not registered for the given + distribution type. + """ + raise NotImplementedError(f"Moments strategy is not implemented for distribution '{type(component).__name__}'.") + + +# --------------------------------- +# Exponential distribution strategy +# --------------------------------- + + +@moments_strategy.register(Exponential) +def _( + component: Exponential[DType], state: PipelineState[DType], block: OptimizationBlock, optimizer: Optimizer +) -> tuple[int, dict[str, DType]]: + """Specialized Moments parameter estimation strategy for the Exponential distribution + using an analytical solution. + + This function provides a closed-form update for the parameters of an + `Exponential` distribution using the Method of Moments. It equates the + theoretical moments to the empirical weighted moments calculated from + the data and responsibilities. + + Notes + ----- + The analytical updates depend on the set of parameters to optimize: + + - **Both `loc` and `rate`**: Derived from the first weighted moment ($m_1$) + and the second weighted moment ($m_2$). The variance is estimated as + $Var = m_2 - m_1^2$. + Then, ``rate`` = $1 / \\sqrt{Var}$ and ``loc`` = $m_1 - \\sqrt{Var}$. + + - **Only `rate`** (fixed `loc`): Derived from the first moment. + ``rate`` = $1 / (m_1 - \\text{loc})$. + + - **Only `loc`** (fixed `rate`): Derived from the first moment. + ``loc`` = $m_1 - (1 / \\text{rate})$. + + This implementation ignores the `optimizer` parameter as it does not + require numerical optimization. + """ + + if state.H is None: + raise ValueError("Responsibility matrix H is not computed.") + + dtype = component.dtype + X = state.X + H_j = state.H[:, block.component_id] + + params_to_optimize = component.params_to_optimize.intersection(block.params_to_optimize) + new_params = {} + + N_j = np.sum(H_j) + + # If the component has negligible responsibility, do not update its parameters. + if np.isclose(N_j, 0.0, atol=NUMERICAL_TOLERANCE): + return block.component_id, {} + + weighted_sum_X = np.dot(H_j, X) + if np.isinf(weighted_sum_X): + handle_numerical_overflow(state, context="Moments optimization") + return block.component_id, {} + + m1 = weighted_sum_X / N_j + + # Update both location (loc) and lambda (rate) if they are in the optimization block + if Exponential.PARAM_LOC in params_to_optimize and Exponential.PARAM_RATE in params_to_optimize: + weighted_sum_X2 = np.dot(H_j, X**2) + if np.isinf(weighted_sum_X2): + handle_numerical_overflow(state, context="Moments optimization") + return block.component_id, {} + + m2 = weighted_sum_X2 / N_j + + variance = np.maximum(m2 - m1**2, dtype(NUMERICAL_TOLERANCE)) + + std_dev = np.sqrt(variance) + + new_params[Exponential.PARAM_RATE] = dtype(1.0 / std_dev) + new_params[Exponential.PARAM_LOC] = dtype(m1 - std_dev) + + # Update lambda (rate) if it's in the optimization block + elif Exponential.PARAM_RATE in params_to_optimize: + denominator = m1 - component.loc + + if np.isclose(denominator, 0.0, NUMERICAL_TOLERANCE): + new_params[Exponential.PARAM_RATE] = component.rate + else: + new_params[Exponential.PARAM_RATE] = dtype(1.0 / denominator) + + # Update location (loc) if it's in the optimization block + elif Exponential.PARAM_LOC in params_to_optimize: + new_loc = m1 - (dtype(1.0) / component.rate) + new_params[Exponential.PARAM_LOC] = dtype(new_loc) + + return block.component_id, new_params diff --git a/rework_pysatl_mpest/estimators/iterative/steps/block.py b/rework_pysatl_mpest/estimators/iterative/steps/block.py index 8454959..1c531c5 100644 --- a/rework_pysatl_mpest/estimators/iterative/steps/block.py +++ b/rework_pysatl_mpest/estimators/iterative/steps/block.py @@ -27,9 +27,13 @@ class MaximizationStrategy(Enum): QFUNCTION Indicates that the optimization should maximize the Q-function, which is the expected value of the complete-data log-likelihood. + MOMENTS + Indicates that the optimization should use the Method of Moments, + matching theoretical moments to empirical weighted moments. """ QFUNCTION = auto() + MOMENTS = auto() @dataclass diff --git a/rework_pysatl_mpest/estimators/iterative/steps/maximization_step.py b/rework_pysatl_mpest/estimators/iterative/steps/maximization_step.py index 6212ac9..e25aa59 100644 --- a/rework_pysatl_mpest/estimators/iterative/steps/maximization_step.py +++ b/rework_pysatl_mpest/estimators/iterative/steps/maximization_step.py @@ -23,7 +23,7 @@ from ....distributions import ContinuousDistribution from ....optimizers import Optimizer from ....typings import DType -from .._strategies import q_function_strategy +from .._strategies import moments_strategy, q_function_strategy from ..pipeline_state import PipelineState from ..pipeline_step import PipelineStep from .block import MaximizationStrategy, OptimizationBlock @@ -64,7 +64,10 @@ class MaximizationStep(PipelineStep[DType]): """ _strategies: ClassVar[Mapping[MaximizationStrategy, Callable]] = MappingProxyType( - {MaximizationStrategy.QFUNCTION: q_function_strategy} + { + MaximizationStrategy.QFUNCTION: q_function_strategy, + MaximizationStrategy.MOMENTS: moments_strategy, + } ) def __init__(self, blocks: Sequence[OptimizationBlock], optimizer: Optimizer): diff --git a/rework_tests/unit/estimators/iterative/_strategies/moments/test_exponential_moments.py b/rework_tests/unit/estimators/iterative/_strategies/moments/test_exponential_moments.py new file mode 100644 index 0000000..66745a5 --- /dev/null +++ b/rework_tests/unit/estimators/iterative/_strategies/moments/test_exponential_moments.py @@ -0,0 +1,260 @@ +"""Tests for Moments optimization strategy for Exponential distribution""" + +__author__ = "Aleksandra Ri" +__copyright__ = "Copyright (c) 2025 PySATL project" +__license__ = "SPDX-License-Identifier: MIT" + + +import numpy as np +import pytest +from hypothesis import given, settings +from hypothesis import strategies as st +from rework_pysatl_mpest.distributions import Exponential +from rework_pysatl_mpest.estimators.iterative import MaximizationStrategy, OptimizationBlock, PipelineState +from rework_pysatl_mpest.estimators.iterative._strategies import moments_strategy +from rework_pysatl_mpest.exceptions import NumericalStabilityError + +DTYPES_TO_TEST = [np.float16, np.float32, np.float64] + +# Test Fixtures +# ------------- + + +@pytest.fixture(params=DTYPES_TO_TEST) +def parametrized_exponential_setup(request) -> tuple[Exponential, PipelineState, np.floating]: + """ + Creates a parametrized fixture providing an Exponential component and a + corresponding PipelineState for various dtypes. + """ + dtype = request.param + + component = Exponential(loc=0.0, rate=1.0, dtype=dtype) + + state = PipelineState( + X=np.array([1.0, 2.0, 3.0, 4.0, 5.0], dtype=dtype), + H=np.array([[0.9, 0.1], [0.8, 0.2], [0.7, 0.3], [0.6, 0.4], [0.5, 0.5]], dtype=dtype), + prev_mixture=None, + curr_mixture=None, + error=None, + ) + return component, state, dtype + + +# Tests +# ----- + + +def test_moments_exponential_raises_value_error_if_h_is_none(parametrized_exponential_setup): + """ + Verifies that a ValueError is raised if the responsibility + matrix H in the pipeline state has not been computed. + """ + exponential_component, state, _ = parametrized_exponential_setup + state.H = None + + block = OptimizationBlock( + component_id=0, params_to_optimize={"loc", "rate"}, maximization_strategy=MaximizationStrategy.MOMENTS + ) + + with pytest.raises(ValueError, match="Responsibility matrix H is not computed."): + moments_strategy(exponential_component, state, block, optimizer=None) + + +def test_moments_exponential_returns_correct_types(parametrized_exponential_setup): + """ + Verifies that the function returns a tuple with the correct + data types (int, dict[str, DType]). + """ + exponential_component, pipeline_state, dtype = parametrized_exponential_setup + + block = OptimizationBlock( + component_id=0, params_to_optimize={"loc", "rate"}, maximization_strategy=MaximizationStrategy.MOMENTS + ) + + result = moments_strategy(exponential_component, pipeline_state, block, optimizer=None) + + assert isinstance(result, tuple) + assert isinstance(result[0], int) + assert isinstance(result[1], dict) + + if result[1]: + key, value = next(iter(result[1].items())) + assert isinstance(key, str) + assert isinstance(value, dtype) + + +@pytest.mark.parametrize( + "params_to_optimize_in_block, fixed_params_on_component, expected_keys", + [ + ({"loc", "rate"}, set(), {"loc", "rate"}), # Optimize both, none are fixed + ({"loc"}, set(), {"loc"}), # Optimize only loc + ({"rate"}, set(), {"rate"}), # Optimize only rate + ({"loc", "rate"}, {"loc"}, {"rate"}), # Optimize both, but loc is fixed + ({"loc", "rate"}, {"rate"}, {"loc"}), # Optimize both, but rate is fixed + ({"loc", "rate"}, {"loc", "rate"}, set()), # Optimize both, but both are fixed + ({"non_existent_param", "loc"}, set(), {"loc"}), # Ignore non-existent params + (set(), set(), set()), # Optimize nothing + ], +) +def test_moments_exponential_respects_fixed_and_optimizable_params( + parametrized_exponential_setup, params_to_optimize_in_block, fixed_params_on_component, expected_keys +): + """ + Verifies that the strategy correctly identifies which parameters + to update based on the optimization block and the component's fixed parameters. + """ + exponential_component, pipeline_state, _ = parametrized_exponential_setup + + for param in fixed_params_on_component: + exponential_component.fix_param(param) + + block = OptimizationBlock( + component_id=0, + params_to_optimize=params_to_optimize_in_block, + maximization_strategy=MaximizationStrategy.MOMENTS, + ) + + _, new_params = moments_strategy(exponential_component, pipeline_state, block, optimizer=None) + + assert set(new_params.keys()) == expected_keys + + +def test_moments_exponential_handles_negligible_responsibility(parametrized_exponential_setup): + """ + Verifies that if a component's total responsibility (N_j) is near zero, + its parameters are not updated. + """ + + exponential_component, pipeline_state, dtype = parametrized_exponential_setup + + pipeline_state.H.fill(dtype(1e-10)) # Make all responsibilities negligible + block = OptimizationBlock( + component_id=0, + params_to_optimize={"loc", "rate"}, + maximization_strategy=MaximizationStrategy.MOMENTS, + ) + + _, new_params = moments_strategy(exponential_component, pipeline_state, block, optimizer=None) + + assert new_params == {} + + +def test_moments_exponential_rate_fallback_when_mean_equals_loc(parametrized_exponential_setup): + """ + Tests the edge case where the weighted mean of the data equals the component's location. + + In the formula `rate = 1 / (mean - loc)`, if `mean == loc`, a division by zero would occur. + The code should handle this by keeping the component's original rate. + """ + component, state, dtype = parametrized_exponential_setup + + loc_val = component.loc + state.X = np.array([loc_val, loc_val], dtype=dtype) + state.H = np.array([[1.0], [1.0]], dtype=dtype) + + component.fix_param("loc") + block = OptimizationBlock( + component_id=0, params_to_optimize={"rate"}, maximization_strategy=MaximizationStrategy.MOMENTS + ) + + _, new_params = moments_strategy(component, state, block, optimizer=None) + + assert Exponential.PARAM_RATE in new_params + assert new_params[Exponential.PARAM_RATE] == component.rate + + +@pytest.mark.parametrize( + "x_val,", + [ + # Case 1: Overflow during first moment calculation (weighted_sum_X) + 40000.0, + # Case 2: Overflow ONLY during second moment calculation (weighted_sum_X2) + 300.0, + ], + ids=["overflow_first_moment", "overflow_second_moment"], +) +def test_moments_exponential_overflow_handling(parametrized_exponential_setup, x_val): + """ + Verifies that NumericalStabilityError is correctly set in state + upon overflow in either the first moment or the second moment calculation. + """ + component, state, _ = parametrized_exponential_setup + + # --- Arrange --- + # Use float16, which has a small range, and large values to force an overflow. + # The max value for float16 is ~65504. The sum of X will exceed this. + dtype = np.float16 + component = Exponential(loc=0.0, rate=1.0, dtype=dtype) + X = np.array([x_val, x_val], dtype=dtype) + H = np.array([[1.0], [1.0]], dtype=dtype) + block = OptimizationBlock( + component_id=0, params_to_optimize={"loc", "rate"}, maximization_strategy=MaximizationStrategy.MOMENTS + ) + state = PipelineState(X=X, H=H, curr_mixture=None, prev_mixture=None, error=None) + + _, new_params = moments_strategy(component, state, block, optimizer=None) + + assert state.error is not None + assert isinstance(state.error, NumericalStabilityError) + assert "Overflow detected during Moments optimization" in str(state.error) + assert new_params == {} + + +# Property-Based Test with Hypothesis +# ----------------------------------- + + +@st.composite +def exponential_data_and_true_params(draw, dtype_strategy=st.sampled_from([np.float64])): + """ + Generates a true Exponential component and a data sample from it, + all configured with a specific dtype. + """ + dtype = draw(dtype_strategy) + + true_loc = draw(st.floats(min_value=-100, max_value=100)) + true_rate = draw(st.floats(min_value=0.1, max_value=100)) + true_component = Exponential(loc=true_loc, rate=true_rate, dtype=dtype) + + # 2. Generate a data sample from this distribution + sample_size = draw(st.integers(min_value=10000, max_value=10000)) + X = true_component.generate(size=sample_size).astype(dtype) + + return X, dtype(true_loc), dtype(true_rate), dtype + + +@settings(max_examples=50) +@given(data=exponential_data_and_true_params()) +def test_moments_exponential_recovers_true_params_on_ideal_data(data): + """ + Scenario 3 (Statistical Sanity Check): Verifies that the analytical formulas + can recover the original parameters from a sample when responsibilities are 1.0. + This confirms the statistical validity of the implemented formulas in an ideal case. + """ + # --- Arrange --- + X, true_loc, true_rate, dtype = data + + # This is the key assumption for this test: perfect knowledge that all + # data points belong to our component of interest. + H_j = np.ones_like(X, dtype=dtype) + H = np.vstack([H_j, np.zeros_like(H_j)]).T # Simulate a 2-component mixture + + # Use a starting component with completely different parameters + start_component = Exponential(loc=-999.0, rate=0.001, dtype=dtype) + + state = PipelineState(X=X, H=H, curr_mixture=None, prev_mixture=None, error=None) + block = OptimizationBlock( + component_id=0, params_to_optimize={"loc", "rate"}, maximization_strategy=MaximizationStrategy.MOMENTS + ) + + _, new_params = moments_strategy(start_component, state, block, optimizer=None) + + # The estimated parameters won't be exactly the same due to sampling noise, + # so we use pytest.approx with a relative tolerance. + # For a large enough sample, the estimates should be reasonably close. + assert new_params[Exponential.PARAM_LOC] == pytest.approx(true_loc, abs=0.2) + assert new_params[Exponential.PARAM_RATE] == pytest.approx(true_rate, rel=0.1) + + # Verify that the returned parameters have the correct dtype. + assert isinstance(new_params[Exponential.PARAM_LOC], dtype) + assert isinstance(new_params[Exponential.PARAM_RATE], dtype) diff --git a/rework_tests/unit/estimators/iterative/_strategies/moments/test_generalized_moments.py b/rework_tests/unit/estimators/iterative/_strategies/moments/test_generalized_moments.py new file mode 100644 index 0000000..897e44a --- /dev/null +++ b/rework_tests/unit/estimators/iterative/_strategies/moments/test_generalized_moments.py @@ -0,0 +1,76 @@ +"""Tests for the base implementation of the Moments strategy.""" + +__author__ = "Aleksandra Ri" +__copyright__ = "Copyright (c) 2025 PySATL project" +__license__ = "SPDX-License-Identifier: MIT" + + +from unittest.mock import Mock + +import numpy as np +import pytest +from rework_pysatl_mpest.core import Parameter +from rework_pysatl_mpest.distributions import ContinuousDistribution +from rework_pysatl_mpest.estimators.iterative import OptimizationBlock, PipelineState +from rework_pysatl_mpest.estimators.iterative._strategies import moments_strategy +from rework_pysatl_mpest.optimizers import Optimizer + + +class DummyDistribution(ContinuousDistribution): + """ + A simple dummy implementation of ContinuousDistribution for testing purposes. + It has two parameters: 'param1' and 'param2'. + """ + + param1 = Parameter() + param2 = Parameter() + + def __init__(self, param1: float, param2: float, dtype: np.floating = np.float64): + super().__init__(dtype=dtype) + self.param1 = param1 + self.param2 = param2 + + @property + def name(self) -> str: + return "Dummy" + + @property + def params(self) -> set[str]: + return {"param1", "param2"} + + def pdf(self, X): + return np.array([]) + + def ppf(self, P): + return np.array([]) + + def lpdf(self, X): + return np.log(np.array([0.5] * len(X), dtype=self.dtype)) + + def log_gradients(self, X): + return np.array([]) + + def generate(self, size: int): + return np.array([]) + + +def test_moments_strategy_base_implementation_raises_not_implemented(): + """ + Verifies that the base implementation of `moments_strategy` (via singledispatch) + raises a NotImplementedError when called with a distribution that has no + specifically registered implementation. + """ + + # Arrange + dist = DummyDistribution(param1=1.0, param2=2.0) + + # Mocks for arguments that are not used before the error is raised + mock_state = Mock(spec=PipelineState) + mock_block = Mock(spec=OptimizationBlock) + mock_optimizer = Mock(spec=Optimizer) + + expected_error_msg = "Moments strategy is not implemented for distribution 'DummyDistribution'." + + # Act & Assert + with pytest.raises(NotImplementedError, match=expected_error_msg): + moments_strategy(dist, mock_state, mock_block, mock_optimizer)