Skip to content

Commit d7b56dd

Browse files
authored
Merge pull request #20 from PySATL/nm_g_estimation
Normal Mean Mixture Mixing Density Estimation
2 parents 98eee73 + 8c88118 commit d7b56dd

File tree

2 files changed

+106
-0
lines changed

2 files changed

+106
-0
lines changed

src/algorithms/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
from src.algorithms.semiparam_algorithms.nm_semi_param_algorithms.g_estimation_convolution import (
2+
NMSemiParametricGEstimation,
3+
)
14
from src.algorithms.semiparam_algorithms.nm_semi_param_algorithms.sigma_estimation_eigenvalue_based import (
25
SemiParametricMeanSigmaEstimationEigenvalueBased,
36
)
@@ -28,6 +31,7 @@
2831
ALGORITHM_REGISTRY.register("g_estimation_given_mu", AlgorithmPurpose.NMV_SEMIPARAMETRIC)(
2932
SemiParametricGEstimationGivenMu
3033
)
34+
ALGORITHM_REGISTRY.register("g_estimation_convolution", AlgorithmPurpose.NM_SEMIPARAMETRIC)(NMSemiParametricGEstimation)
3135
ALGORITHM_REGISTRY.register("sigma_estimation_eigenvalue_based", AlgorithmPurpose.NM_SEMIPARAMETRIC)(
3236
SemiParametricMeanSigmaEstimationEigenvalueBased
3337
)
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
import math
2+
from typing import Any, Callable, Dict, List, Optional, TypedDict, Unpack
3+
4+
import numpy as np
5+
from numpy import _typing
6+
7+
from src.estimators.estimate_result import EstimateResult
8+
9+
SIGMA_DEFAULT_VALUE: float = 1
10+
BOHMAN_N_DEFAULT_VALUE: int = 10000
11+
BOHMAN_DELTA_DEFAULT_VALUE: float = 0.0001
12+
X_DATA_DEFAULT_VALUE: List[float] = [1.0]
13+
14+
15+
class NMSemiParametricGEstimation:
16+
"""Estimation of mixing density function g (xi density function) of NM mixture
17+
represented in canonical form Y = xi + sigma*N.
18+
19+
Args:
20+
sample: Sample data from the analyzed distribution
21+
params: Algorithm parameters including:
22+
- x_data: Evaluation points for density estimation
23+
- sigman
24+
- bohman_n
25+
- bohman_delta
26+
27+
Raises:
28+
ValueError: If input sample is empty or invalid parameters provided
29+
"""
30+
31+
class ParamsAnnotation(TypedDict, total=False):
32+
x_data: List[float]
33+
sigma: float
34+
bohman_n: int
35+
bohman_delta: float
36+
37+
def __init__(self, sample: Optional[_typing.NDArray[np.float64]] = None, **kwargs: Unpack[ParamsAnnotation]):
38+
self.sample: _typing.NDArray[np.float64] = np.array([]) if sample is None else sample
39+
self.n: int = len(self.sample)
40+
(
41+
self.x_data,
42+
self.sigma,
43+
self.bohman_n,
44+
self.bohman_delta,
45+
) = self._validate_kwargs(**kwargs)
46+
47+
@staticmethod
48+
def _validate_kwargs(**kwargs: Unpack[ParamsAnnotation]) -> tuple[List[float], float, int, float]:
49+
x_data: List[float] = kwargs.get("x_data", X_DATA_DEFAULT_VALUE)
50+
sigma: float = kwargs.get("sigma", SIGMA_DEFAULT_VALUE)
51+
bohman_n: int = kwargs.get("bohman_n", BOHMAN_N_DEFAULT_VALUE)
52+
bohman_delta: float = kwargs.get("bohman_delta", BOHMAN_DELTA_DEFAULT_VALUE)
53+
return x_data, sigma, bohman_n, bohman_delta
54+
55+
def characteristic_function_mixture(self, t: float) -> complex:
56+
smm = 0
57+
for i in range(self.n):
58+
smm += np.exp(1j * t * self.sample[i])
59+
return smm / self.n
60+
61+
def characteristic_function_normal(self, t: float) -> complex:
62+
return np.exp(-0.5 * (self.sigma**2) * t**2)
63+
64+
def characteristic_function_xi(self, t: float) -> complex:
65+
denominator = np.maximum(np.abs(self.characteristic_function_normal(self.sigma * t)), 1e-10)
66+
return self.characteristic_function_mixture(t) / denominator
67+
68+
class BohmanA:
69+
def __init__(self, N: int = int(1e3), delta: float = 1e-1) -> None:
70+
super().__init__()
71+
self.N: int = N
72+
self.delta: float = delta
73+
self.coeff_0: float = 0.5
74+
self.coeff_1: float = 0.0
75+
self.coeff: np.ndarray = np.array([])
76+
77+
def fit(self, phi: Callable) -> None:
78+
self.coeff_0 = 0.5
79+
self.coeff_1 = self.delta / (2 * np.pi)
80+
81+
v_values = np.arange(1 - self.N, self.N)
82+
v_values = v_values[v_values != 0]
83+
84+
self.coeff = phi(self.delta * v_values) / (2 * np.pi * 1j * v_values)
85+
86+
def cdf(self, X: np.ndarray) -> np.ndarray:
87+
v = np.arange(1 - self.N, self.N)
88+
v_non_zero = v[v != 0]
89+
90+
x_vect = np.outer(X, v_non_zero)
91+
92+
F_x = self.coeff_0 + X * self.coeff_1 + (-np.exp(-1j * self.delta * x_vect) @ self.coeff)
93+
94+
return F_x.real
95+
96+
def algorithm(self, sample: _typing.NDArray[np.float64]) -> EstimateResult:
97+
inv = self.BohmanA(N=self.bohman_n, delta=self.bohman_delta)
98+
inv.fit(self.characteristic_function_xi)
99+
x_data_array = np.array(self.x_data, dtype=np.float64)
100+
estimated_cdf = inv.cdf(x_data_array)
101+
estimated_pdf = np.gradient(estimated_cdf, x_data_array)
102+
return EstimateResult(list_value=estimated_pdf.tolist(), success=True)

0 commit comments

Comments
 (0)