Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 13 additions & 9 deletions rework_pysatl_mpest/core/mixture.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ def remove_component(self, component_idx: int):
self._cached_weights = None
self._sorted_pairs_cache = None

def pdf(self, X: ArrayLike) -> NDArray[DType]:
def pdf(self, X: ArrayLike) -> DType | NDArray[DType]:
"""Probability Density Function of the mixture.

The PDF is computed as the weighted sum of the PDFs of its
Expand All @@ -268,15 +268,16 @@ def pdf(self, X: ArrayLike) -> NDArray[DType]:

Returns
-------
NDArray[DType]
DType | NDArray[DType]
The PDF values corresponding to each point in :attr:`X`.
Return a scalar when given a scalar, and to return an array when given an array.
"""

X = np.asarray(X, dtype=self.dtype)
component_pdfs = np.array([comp.pdf(X) for comp in self.components])
return np.asarray(np.dot(self.weights, component_pdfs))
return np.dot(self.weights, component_pdfs)

def lpdf(self, X: ArrayLike) -> NDArray[DType]:
def lpdf(self, X: ArrayLike) -> DType | NDArray[DType]:
"""Logarithms of the Probability Density Function.

Parameters
Expand All @@ -286,15 +287,18 @@ def lpdf(self, X: ArrayLike) -> NDArray[DType]:

Returns
-------
NDArray[DType]
DType | NDArray[DType]
The log-PDF values corresponding to each point in :attr:`X`.
Return a scalar when given a scalar, and to return an array when given an array.
"""

X = np.atleast_1d(X).astype(self.dtype)
X = np.asarray(X, dtype=self.dtype)
component_lpdfs = np.array([comp.lpdf(X) for comp in self.components])
log_weights = self.log_weights
log_terms = log_weights[:, np.newaxis] + component_lpdfs
return logsumexp(log_terms, axis=0) # type: ignore
broadcast_shape = (self.n_components,) + (1,) * X.ndim
log_weights = self.log_weights.reshape(broadcast_shape)
log_terms = log_weights + component_lpdfs

return logsumexp(log_terms, axis=0)

def loglikelihood(self, X: ArrayLike) -> DType:
"""Log-likelihood of the complete data :attr:`X`.
Expand Down
66 changes: 53 additions & 13 deletions rework_pysatl_mpest/distributions/beta.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,12 @@ def pdf(self, X):

Returns
-------
NDArray[DType]
DType | NDArray[DType]
The PDF values corresponding to each point in :attr:`X`.
Return a scalar when given a scalar, and to return an array when given an array.

"""

X = np.asarray(X, dtype=self.dtype)
return np.exp(self.lpdf(X))

Expand All @@ -137,13 +139,16 @@ def ppf(self, P):

Returns
-------
NDArray[DType]
DType | NDArray[DType]
The PPF values corresponding to each probability in :attr:`P`.
Return a scalar when given a scalar, and to return an array when given an array.
"""

is_scalar = np.isscalar(P)
P = np.asarray(P, dtype=self.dtype)
dtype = self.dtype

return np.where(
result = np.where(
(P >= 0) & (P <= 1),
(
self.left_border
Expand All @@ -152,6 +157,10 @@ def ppf(self, P):
dtype(np.nan),
)

if is_scalar:
return result[()]
return result

def lpdf(self, X):
"""Log of the Probability Density Function (LPDF).

Expand All @@ -177,8 +186,9 @@ def lpdf(self, X):

Returns
-------
NDArray[DType]
DType | NDArray[DType]
The log-PDF values corresponding to each point in :attr:`X`.
Return a scalar when given a scalar, and to return an array when given an array.
"""

X = np.asarray(X, dtype=self.dtype)
Expand All @@ -189,7 +199,7 @@ def lpdf(self, X):
log_pdf_standard = beta_dist.logpdf(Z, self.alpha, self.beta).astype(dtype)
result = log_pdf_standard - np.log(self.right_border - self.left_border)

return np.atleast_1d(result)
return result

def _dlog_alpha(self, X):
"""Partial derivative of the lpdf w.r.t. the :attr:`alpha` parameter.
Expand All @@ -214,22 +224,28 @@ def _dlog_alpha(self, X):

Returns
-------
NDArray[DType]
DType | NDArray[DType]
The gradient of the lpdf with respect to :attr:`alpha` for each point in :attr:`X`.
Return a scalar when given a scalar, and to return an array when given an array.
"""

is_scalar = np.isscalar(X)
X = np.asarray(X, dtype=self.dtype)
dtype = self.dtype

in_bounds = (self.left_border < X) & (self.right_border >= X)
return np.where(
result = np.where(
in_bounds,
np.log(X - self.left_border)
- np.log(self.right_border - self.left_border)
- (dtype(digamma(self.alpha)) - dtype(digamma(self.alpha + self.beta))),
dtype(0.0),
)

if is_scalar:
return result[()]
return result

def _dlog_beta(self, X):
"""Partial derivative of the lpdf w.r.t. the :attr:`beta` parameter.

Expand All @@ -253,22 +269,28 @@ def _dlog_beta(self, X):

Returns
-------
NDArray[DType]
DType | NDArray[DType]
The gradient of the lpdf with respect to :attr:`beta` for each point in :attr:`X`.
Return a scalar when given a scalar, and to return an array when given an array.
"""

is_scalar = np.isscalar(X)
X = np.asarray(X, dtype=self.dtype)
dtype = self.dtype

in_bounds = (self.left_border < X) & (self.right_border >= X)
return np.where(
result = np.where(
in_bounds,
np.log(self.right_border - X)
- np.log(self.right_border - self.left_border)
- (dtype(digamma(self.beta)) - dtype(digamma(self.alpha + self.beta))),
dtype(0.0),
)

if is_scalar:
return result[()]
return result

def _dlog_left_border(self, X):
"""Partial derivative of the lpdf w.r.t. the :attr:`left_border` parameter.

Expand All @@ -290,15 +312,17 @@ def _dlog_left_border(self, X):

Returns
-------
NDArray[DType]
DType | NDArray[DType]
The gradient of the lpdf with respect to :attr:`left_border` for each point in :attr:`X`.
Return a scalar when given a scalar, and to return an array when given an array.
"""

is_scalar = np.isscalar(X)
X = np.asarray(X, dtype=self.dtype)
dtype = self.dtype

in_bounds = (self.left_border < X) & (self.right_border >= X)
return np.where(
result = np.where(
in_bounds,
(
((self.alpha + self.beta - dtype(1)) / (self.right_border - self.left_border))
Expand All @@ -307,6 +331,10 @@ def _dlog_left_border(self, X):
dtype(0.0),
)

if is_scalar:
return result[()]
return result

def _dlog_right_border(self, X):
"""Partial derivative of the lpdf w.r.t. the :attr:`right_border` parameter.

Expand All @@ -328,14 +356,17 @@ def _dlog_right_border(self, X):

Returns
-------
NDArray[DType]
DType | NDArray[DType]
The gradient of the lpdf with respect to :attr:`right_border` for each point in :attr:`X`.
Return a scalar when given a scalar, and to return an array when given an array.
"""

is_scalar = np.isscalar(X)
X = np.asarray(X, dtype=self.dtype)
dtype = self.dtype

in_bounds = (self.left_border < X) & (self.right_border >= X)
return np.where(
result = np.where(
in_bounds,
(
((self.beta - dtype(1)) / (self.right_border - X))
Expand All @@ -344,6 +375,10 @@ def _dlog_right_border(self, X):
dtype(0.0),
)

if is_scalar:
return result[()]
return result

def log_gradients(self, X):
"""Calculates the gradients of the log-PDF w.r.t. its parameters.

Expand All @@ -361,7 +396,10 @@ def log_gradients(self, X):
and each column corresponds to the gradient with respect to a
specific optimizable parameter. The order of columns corresponds
to the sorted order of :attr:`self.params_to_optimize`.
Returns a 1D array if X is a scalar.
"""

is_scalar = np.isscalar(X)
X = np.asarray(X, dtype=self.dtype)

gradient_calculators = {
Expand All @@ -378,6 +416,8 @@ def log_gradients(self, X):

gradients = [gradient_calculators[param](X) for param in optimizable_params]

if is_scalar:
return np.array(gradients)
return np.stack(gradients, axis=1)

def generate(self, size: int):
Expand Down
46 changes: 35 additions & 11 deletions rework_pysatl_mpest/distributions/cauchy.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,13 @@ def pdf(self, X):

Returns
-------
NDArray[DType]
DType | NDArray[DType]
The PDF values corresponding to each point in :attr:`X`.
Return a scalar when given a scalar, and to return an array when given an array.
"""

X = np.asarray(X, dtype=self.dtype)
dtype = self.dtype

return dtype(1.0) / (dtype(np.pi) * self.scale * (dtype(1.0) + ((X - self.loc) / self.scale) ** 2))
return np.exp(self.lpdf(X))

def ppf(self, P):
"""Percent Point Function (PPF) or quantile function.
Expand All @@ -110,13 +109,16 @@ def ppf(self, P):

Returns
-------
NDArray[DType]
DType | NDArray[DType]
The PPF values corresponding to each probability in :attr:`P`.
Return a scalar when given a scalar, and to return an array when given an array.
"""

is_scalar = np.isscalar(P)
P = np.asarray(P, dtype=self.dtype)
dtype = self.dtype

return np.where(
result = np.where(
(P >= 0) & (P <= 1),
np.where(
(P == 0) | (P == 1),
Expand All @@ -125,6 +127,9 @@ def ppf(self, P):
),
dtype(np.nan),
)
if is_scalar:
return result[()]
return result

def lpdf(self, X):
"""Log of the Probability Density Function (LPDF).
Expand All @@ -145,8 +150,9 @@ def lpdf(self, X):

Returns
-------
NDArray[DType]
DType | NDArray[DType]
The log-PDF values corresponding to each point in :attr:`X`.
Return a scalar when given a scalar, and to return an array when given an array.
"""

X = np.asarray(X, dtype=self.dtype)
Expand Down Expand Up @@ -179,14 +185,20 @@ def _dlog_loc(self, X):

Returns
-------
NDArray[DType]
DType | NDArray[DType]
The gradient of the lpdf with respect to :attr:`loc` for each point in ::attr`X`.
Return a scalar when given a scalar, and to return an array when given an array.
"""

is_scalar = np.isscalar(X)
X = np.asarray(X, dtype=self.dtype)
dtype = self.dtype

return (dtype(2) * X - dtype(2) * self.loc) / (self.scale**2 + X**2 - dtype(2) * self.loc * X + self.loc**2)
result = (dtype(2) * X - dtype(2) * self.loc) / (self.scale**2 + X**2 - dtype(2) * self.loc * X + self.loc**2)

if is_scalar:
return result[()]
return result

def _dlog_scale(self, X):
"""Partial derivative of the lpdf w.r.t. the :attr:`scale` parameter.
Expand All @@ -208,16 +220,23 @@ def _dlog_scale(self, X):

Returns
-------
NDArray[DType]
DType | NDArray[DType]
The gradient of the lpdf with respect to :attr:`rate` for each point in :attr:`X`.
Return a scalar when given a scalar, and to return an array when given an array.
"""

is_scalar = np.isscalar(X)
X = np.asarray(X, dtype=self.dtype)
dtype = self.dtype

return (-(self.scale**2) + X**2 - dtype(2) * self.loc * X + self.loc**2) / (
result = (-(self.scale**2) + X**2 - dtype(2) * self.loc * X + self.loc**2) / (
self.scale**3 + self.scale * (X**2) - dtype(2) * self.loc * self.scale * X + self.scale * self.loc**2
)

if is_scalar:
return result[()]
return result

def log_gradients(self, X):
"""Calculates the gradients of the log-PDF w.r.t. its parameters.

Expand All @@ -235,7 +254,10 @@ def log_gradients(self, X):
and each column corresponds to the gradient with respect to a
specific optimizable parameter. The order of columns corresponds
to the sorted order of :attr:`self.params_to_optimize`.
Returns a 1D array if X is a scalar.
"""

is_scalar = np.isscalar(X)
X = np.asarray(X, dtype=self.dtype)

gradient_calculators = {
Expand All @@ -250,6 +272,8 @@ def log_gradients(self, X):

gradients = [gradient_calculators[param](X) for param in optimizable_params]

if is_scalar:
return np.array(gradients)
return np.stack(gradients, axis=1)

def generate(self, size: int):
Expand Down
Loading