Skip to content
Open
2 changes: 1 addition & 1 deletion jupiter_examples/nm_sigma_estimation_comparison.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@
" \"\"\"\n",
" generator = NMGenerator()\n",
" mixture = NormalMeanMixtures(\"canonical\", sigma=real_sigma, distribution=distribution)\n",
" return generator.canonical_generate(mixture, sample_len)\n",
" return generator.generate(mixture, sample_len)\n",
"\n",
"def estimate_sigma_eigenvalue_based(sample, real_sigma, search_area, a, b):\n",
" sample_len = len(sample)\n",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
from scipy.integrate import quad_vec
from scipy.special import gamma

from src.algorithms.support_algorithms.rqmc import RQMC
from src.algorithms.support_algorithms.integrator import Integrator
from src.algorithms.support_algorithms.rqmc import RQMCIntegrator
from src.estimators.estimate_result import EstimateResult

MU_DEFAULT_VALUE = 1.0
Expand Down Expand Up @@ -68,7 +69,7 @@ def __init__(self, sample: Optional[_typing.NDArray[np.float64]] = None, **kwarg
self.x_data,
self.grid_size,
self.integration_tolerance,
self.integration_limit,
self.integration_limit
) = self._validate_kwargs(self.n, **kwargs)
self.denominator: float = 2 * math.pi * self.n
self.precompute_gamma_grid()
Expand Down Expand Up @@ -162,15 +163,15 @@ def second_v_integrand(self, v: float, x: float) -> np.ndarray:
x_power = self.x_powers[x][idx]
return (self.second_u_integrals[idx] * x_power) / gamma_val

def compute_integrals_for_x(self, x: float) -> float:
def compute_integrals_for_x(self, x: float, integrator: Integrator = RQMCIntegrator()) -> float:
"""Compute integrals using RQMC for v-integration."""
first_integral = RQMC(lambda t: np.sum(self.first_v_integrand(t * self.v_value, x)) * self.v_value).rqmc()[0]
first_integral = integrator.compute(func=lambda t: np.sum(self.first_v_integrand(t * self.v_value, x)) * self.v_value).value

second_integral = RQMC(lambda t: np.sum(self.second_v_integrand(-t * self.v_value, x)) * self.v_value).rqmc()[0]
second_integral = integrator.compute(func=lambda t: np.sum(self.second_v_integrand(-t * self.v_value, x)) * self.v_value).value

total = (first_integral + second_integral) / self.denominator
return max(0.0, total.real)

def algorithm(self, sample: np._typing.NDArray) -> EstimateResult:
y_data = [self.compute_integrals_for_x(x) for x in self.x_data]
y_data = [self.compute_integrals_for_x(x, RQMCIntegrator()) for x in self.x_data]
return EstimateResult(list_value=y_data, success=True)
20 changes: 20 additions & 0 deletions src/algorithms/support_algorithms/integrator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from dataclasses import dataclass
from typing import Any, Protocol, Callable, Optional


@dataclass
class IntegrationResult:
value: float
error: float
message: Optional[dict[str, Any]] | None = None


class Integrator(Protocol):

"""Base class for integral calculation"""

def __init__(self) -> None:
...

def compute(self, func: Callable) -> IntegrationResult:
...
60 changes: 60 additions & 0 deletions src/algorithms/support_algorithms/quad_integrator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
from typing import Callable, Any, Sequence

from scipy.integrate import quad
from src.algorithms.support_algorithms.integrator import IntegrationResult

class QuadIntegrator:

def __init__(
self,
a: float = 0,
b: float = 1,
args: tuple[Any, ...] = (),
full_output: int = 0,
epsabs: float | int = 1.49e-08,
epsrel: float | int = 1.49e-08,
limit: float | int = 50,
points: Sequence[float | int] | None = None,
weight: float | int | None = None,
wvar: Any = None,
wopts: Any = None,
maxp1: float | int = 50,
limlst: int = 50,
complex_func: bool = False,
):
self.params = {
'a': a,
'b': b,
'args': args,
'full_output': full_output,
'epsabs': epsabs,
'epsrel': epsrel,
'limit': limit,
'points': points,
'weight': weight,
'wvar': wvar,
'wopts': wopts,
'maxp1': maxp1,
'limlst': limlst,
'complex_func': complex_func
}

def compute(self, func: Callable) -> IntegrationResult:

"""
Compute integral via quad integrator

Args:
func: integrated function

Returns: moment approximation and error tolerance
"""

verbose = self.params.pop('full_output', False)
result = quad(func, **self.params)
if verbose:
value, error, message = result
else:
value, error = result
message = None
return IntegrationResult(value, error, message)
59 changes: 55 additions & 4 deletions src/algorithms/support_algorithms/rqmc.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import scipy
from numba import njit

from src.algorithms.support_algorithms.integrator import IntegrationResult

BITS = 30
"""Number of bits in XOR. Should be less than 64"""
NUMBA_FAST_MATH = True
Expand Down Expand Up @@ -126,7 +128,8 @@ def _update(

Returns:Updated mean of all rows

"""

"""
values = []
sum_of_new: float = 0.0
for i in range(self.count):
Expand Down Expand Up @@ -212,9 +215,9 @@ def _xor_float(a: float, b: float) -> float:
Returns: XOR float value

"""
a = int(a * (2**BITS))
b = int(b * (2**BITS))
return np.bitwise_xor(a, b) / 2**BITS
a = int(a * (2 ** BITS))
b = int(b * (2 ** BITS))
return np.bitwise_xor(a, b) / 2 ** BITS

def __call__(self) -> tuple[float, float]:
"""Interface for users
Expand All @@ -223,3 +226,51 @@ def __call__(self) -> tuple[float, float]:

"""
return self.rqmc()


class RQMCIntegrator:
"""
Randomize Quasi Monte Carlo Method

Args:
error_tolerance: pre-specified error tolerance
count: number of rows of random values matrix
base_n: number of columns of random values matrix
i_max: allowed number of cycles
a: parameter for quantile of normal distribution

"""

def __init__(
self,
error_tolerance: float = 1e-6,
count: int = 25,
base_n: int = 2 ** 6,
i_max: int = 100,
a: float = 0.00047,
):
self.error_tolerance = error_tolerance
self.count = count
self.base_n = base_n
self.i_max = i_max
self.a = a

def compute(self, func: Callable) -> IntegrationResult:
"""
Compute integral via RQMC integrator

Args:
func: integrated function

Returns: moment approximation and error tolerance
"""
result = RQMC(
func,
error_tolerance=self.error_tolerance,
count=self.count,
base_n=self.base_n,
i_max=self.i_max,
a=self.a,
)()
return IntegrationResult(result[0], result[1])

25 changes: 3 additions & 22 deletions src/generators/nm_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
class NMGenerator(AbstractGenerator):

@staticmethod
def classical_generate(mixture: AbstractMixtures, size: int) -> tpg.NDArray:
def generate(mixture: AbstractMixtures, size: int) -> tpg.NDArray:
"""Generate a sample of given size. Classical form of NMM

Args:
Expand All @@ -27,25 +27,6 @@ def classical_generate(mixture: AbstractMixtures, size: int) -> tpg.NDArray:
raise ValueError("Mixture must be NormalMeanMixtures")
mixing_values = mixture.params.distribution.rvs(size=size)
normal_values = scipy.stats.norm.rvs(size=size)
if mixture.mixture_form == "canonical":
return mixing_values + mixture.params.sigma * normal_values
return mixture.params.alpha + mixture.params.beta * mixing_values + mixture.params.gamma * normal_values

@staticmethod
def canonical_generate(mixture: AbstractMixtures, size: int) -> tpg.NDArray:
"""Generate a sample of given size. Canonical form of NMM

Args:
mixture: Normal Mean Mixture
size: length of sample

Returns: sample of given size

Raises:
ValueError: If mixture is not a Normal Mean Mixture

"""

if not isinstance(mixture, NormalMeanMixtures):
raise ValueError("Mixture must be NormalMeanMixtures")
mixing_values = mixture.params.distribution.rvs(size=size)
normal_values = scipy.stats.norm.rvs(size=size)
return mixing_values + mixture.params.sigma * normal_values
27 changes: 4 additions & 23 deletions src/generators/nmv_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
class NMVGenerator(AbstractGenerator):

@staticmethod
def classical_generate(mixture: AbstractMixtures, size: int) -> tpg.NDArray:
def generate(mixture: AbstractMixtures, size: int) -> tpg.NDArray:
"""Generate a sample of given size. Classical form of NMVM

Args:
Expand All @@ -27,29 +27,10 @@ def classical_generate(mixture: AbstractMixtures, size: int) -> tpg.NDArray:
raise ValueError("Mixture must be NormalMeanMixtures")
mixing_values = mixture.params.distribution.rvs(size=size)
normal_values = scipy.stats.norm.rvs(size=size)
if mixture.mixture_form == "canonical":
return mixture.params.alpha + mixture.params.mu * mixing_values + (mixing_values ** 0.5) * normal_values
return (
mixture.params.alpha
+ mixture.params.beta * mixing_values
+ mixture.params.gamma * (mixing_values**0.5) * normal_values
)

@staticmethod
def canonical_generate(mixture: AbstractMixtures, size: int) -> tpg.NDArray:
"""Generate a sample of given size. Canonical form of NMVM

Args:
mixture: Normal Mean Variance Mixtures
size: length of sample

Returns: sample of given size

Raises:
ValueError: If mixture type is not Normal Mean Variance Mixtures

"""

if not isinstance(mixture, NormalMeanVarianceMixtures):
raise ValueError("Mixture must be NormalMeanMixtures")
mixing_values = mixture.params.distribution.rvs(size=size)
normal_values = scipy.stats.norm.rvs(size=size)
return mixture.params.alpha + mixture.params.mu * mixing_values + (mixing_values**0.5) * normal_values
)
27 changes: 4 additions & 23 deletions src/generators/nv_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
class NVGenerator(AbstractGenerator):

@staticmethod
def classical_generate(mixture: AbstractMixtures, size: int) -> tpg.NDArray:
def generate(mixture: AbstractMixtures, size: int) -> tpg.NDArray:
"""Generate a sample of given size. Classical form of NVM

Args:
Expand All @@ -27,25 +27,6 @@ def classical_generate(mixture: AbstractMixtures, size: int) -> tpg.NDArray:
raise ValueError("Mixture must be NormalMeanMixtures")
mixing_values = mixture.params.distribution.rvs(size=size)
normal_values = scipy.stats.norm.rvs(size=size)
return mixture.params.alpha + mixture.params.gamma * (mixing_values**0.5) * normal_values

@staticmethod
def canonical_generate(mixture: AbstractMixtures, size: int) -> tpg.NDArray:
"""Generate a sample of given size. Canonical form of NVM

Args:
mixture: Normal Variance Mixtures
size: length of sample

Returns: sample of given size

Raises:
ValueError: If mixture type is not Normal Variance Mixtures

"""

if not isinstance(mixture, NormalVarianceMixtures):
raise ValueError("Mixture must be NormalMeanMixtures")
mixing_values = mixture.params.distribution.rvs(size=size)
normal_values = scipy.stats.norm.rvs(size=size)
return mixture.params.alpha + (mixing_values**0.5) * normal_values
if mixture.mixture_form == "canonical":
return mixture.params.alpha + (mixing_values ** 0.5) * normal_values
return mixture.params.alpha + mixture.params.gamma * (mixing_values**0.5) * normal_values
Loading
Loading