Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
4862341
feat(kliep): rewrite kliep algorithm implementation
alistkova Mar 26, 2025
f6348e9
refactor(algorithms): refactoring changes rebase.
alexdtat Mar 18, 2025
b331aa6
fix(algorithms): typing fix from List to list.
alexdtat Mar 18, 2025
02ebaba
refactor(algorithms): change of vision scope modifiers.
alexdtat Mar 18, 2025
7b49a81
fix(algorithms): change learning in Bayesian online CPD.
alexdtat Mar 18, 2025
3f0e3ff
refactor(bayesian algorithm): typing an naming changes
alexdtat Mar 26, 2025
067211f
refactor(bayesian algorithm): typing an naming changes
alexdtat Mar 26, 2025
53535e8
refactor(bayesian algorithm): detectors and localizers renaming.
alexdtat Mar 26, 2025
ef11990
docs(bayesian algorithm): license change.
alexdtat Mar 26, 2025
fdace5e
feat(algorithms): add online-algorithms related functionality
alexdtat Mar 30, 2025
b0f752f
fix(tests): fix import in cpd_solver tests
alexdtat Mar 30, 2025
f7e47fb
test(online-cpd): add tests to online-algorithms related functionality
alexdtat Mar 31, 2025
f70d1cb
refactor(bayesian online algorithm): add asserts
alexdtat Apr 1, 2025
033c0a5
fix(bayesian online algorithm): fix learning behavior after change po…
alexdtat Apr 1, 2025
cc05b16
docs(readme): update info about bayesian algorithm
alexdtat Apr 1, 2025
df400b4
refactor(online cpd solver): add suggested cp processing optimization
alexdtat Apr 5, 2025
d982093
fix(gaussian likelihood): change deprecated decorator
alexdtat Apr 5, 2025
1145426
docs(online algorithms): clarify localization method docstring
alexdtat Apr 5, 2025
2f19c50
chore(gitignore): add ./idea to gitignore
alexdtat Apr 5, 2025
714fe90
fix(kliep): typing fix
alistkova Apr 8, 2025
048b46d
Merge branch 'PySATL:main' into main
alistkova Apr 8, 2025
0b80f9e
revert merge
alistkova Apr 8, 2025
36433ff
fix(kliep): organize imports
alistkova Apr 8, 2025
e130d77
fix: correct type annotations
alistkova May 3, 2025
c8e9405
style: modernize type annotations and clean imports
alistkova May 3, 2025
fecf34f
style: organize imports
alistkova May 3, 2025
b8fd7f5
style: organize imports
alistkova May 3, 2025
9d18ad1
fix: np type error
alistkova May 29, 2025
469d94f
fix: upd density based
alistkova May 29, 2025
097fcf9
feat: rewrite rulsif algorithm implementation
alistkova May 29, 2025
155aabd
fix: correct minimize call and remove unused type ignore
alistkova May 29, 2025
67add9b
fix: delete unused type ignore comment
alistkova May 29, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
189 changes: 116 additions & 73 deletions pysatl_cpd/core/algorithms/density/abstracts/density_based_algorithm.py
Original file line number Diff line number Diff line change
@@ -1,112 +1,155 @@
from abc import abstractmethod
from collections.abc import Callable
from typing import TypeAlias
from typing import Any, Callable, TypeAlias

import numpy as np
import numpy.typing as npt
from scipy.optimize import minimize

from pysatl_cpd.core.algorithms.abstract_algorithm import Algorithm

_TObjFunc: TypeAlias = Callable[[npt.NDArray[np.float64], npt.NDArray[np.float64]], float]
_TMetrics: TypeAlias = dict[str, int | float]


class DensityBasedAlgorithm(Algorithm):
@staticmethod
def _kernel_density_estimation(observation: npt.NDArray[np.float64], bandwidth: float) -> npt.NDArray[np.float64]:
"""Perform kernel density estimation on the given observations without fitting a model.
"""Abstract base class for density-based change point detection algorithms.

:param observation: the data points for which to estimate the density.
:param bandwidth: the bandwidth parameter for the kernel density estimation.
Provides common infrastructure for methods that detect change points by
analyzing probability density changes in data segments.
"""
def __init__(
self,
min_window_size: int = 10,
threshold: float = 1.1,
bandwidth: float = 1.0
) -> None:
"""
Initializes density-based change point detector.

:return: estimated density values for the observations.
:param min_window_size: minimum data points required in each segment
:param threshold: detection sensitivity (higher = fewer detections)
:param bandwidth:kernel bandwidth for density estimation
"""
n = len(observation)
x_grid = np.linspace(np.min(observation) - 3 * bandwidth, np.max(observation) + 3 * bandwidth, 1000)
kde_values = np.zeros_like(x_grid)
for x in observation:
kde_values += np.exp(-0.5 * ((x_grid - x) / bandwidth) ** 2)
self.min_window_size = min_window_size
self.threshold = threshold
self.bandwidth = bandwidth

kde_values /= n * bandwidth * np.sqrt(2 * np.pi)
return kde_values
def detect(self, window: npt.NDArray[np.float64]) -> int:
"""Counts change points in the given data window.

def _calculate_weights(
self,
test_value: npt.NDArray[np.float64],
reference_value: npt.NDArray[np.float64],
bandwidth: float,
objective_function: _TObjFunc,
) -> npt.NDArray[np.float64]:
"""Calculate the weights based on the density ratio between test and reference values.
:param window: input data array (1D or 2D)
:return: number of detected change points
"""
return len(self.localize(window))

:param test_value: the test data points.
:param reference_value: the reference data points.
:param bandwidth: the bandwidth parameter for the kernel density estimation.
:param objective_function: the objective function to minimize.
def localize(self, window: npt.NDArray[np.float64]) -> list[int]:
"""Identifies positions of change points in the data window.

:return: the calculated density ratios normalized to their mean.
:param window: input data array (1D or 2D)
:return: list of change point indices
"""
test_density = self._kernel_density_estimation(test_value, bandwidth)
reference_density = self._kernel_density_estimation(reference_value, bandwidth)
window = self._validate_window(window)
if not self._is_window_valid(window):
return []
scores = self._compute_scores(window)
return self._find_change_points(scores)

def objective_function_wrapper(alpha: npt.NDArray[np.float64], /) -> float:
"""Wrapper for the objective function to calculate the density ratio.
def _validate_window(self, window: npt.NDArray[Any]) -> npt.NDArray[np.float64]:
"""Ensures input window meets processing requirements.

:param alpha: relative parameter that controls the weighting between the numerator distribution
and the denominator distribution in the density ratio estimation.
:param window: raw input data
:return: validated 2D float64 array
"""
window_arr = np.asarray(window, dtype=np.float64)
if window_arr.ndim == 1:
window_arr = window_arr.reshape(-1, 1).astype(np.float64)
return np.array(window_arr, dtype=np.float64)

:return: the value of the objective function to minimize.
"""
objective_density_ratio = np.exp(test_density - reference_density - alpha)
return objective_function(objective_density_ratio, alpha)
def _find_change_points(self, scores: npt.NDArray[np.float64]) -> list[int]:
"""Filters candidate points using threshold and minimum separation.

res = minimize(objective_function_wrapper, np.zeros(len(test_value)), method="L-BFGS-B")
optimized_alpha: npt.NDArray[np.float64] = res.x
density_ratio: npt.NDArray[np.float64] = np.exp(test_density - reference_density - optimized_alpha)
return density_ratio / np.mean(density_ratio)
:param scores: change point scores for each position
:return: filtered list of change point indices
"""
candidates = np.where(scores > self.threshold)[0]
if not candidates.size:
return []
change_points = [int(candidates[0])]
for point in candidates[1:]:
if point - change_points[-1] > self.min_window_size:
change_points.append(int(point))
return change_points

def _is_window_valid(self, window: npt.NDArray[np.float64]) -> bool:
"""Verifies window meets minimum size requirements.

:param window: input data window
:return: True if window can be processed, else False
"""
return len(window) >= 2 * self.min_window_size

@abstractmethod
def detect(self, window: npt.NDArray[np.float64]) -> int:
# maybe rtype tuple[int]
"""Function for finding change points in window
@staticmethod
def _kernel_density_estimation(
observation: npt.NDArray[np.float64],
bandwidth: float
) -> npt.NDArray[np.float64]:
"""Computes kernel density estimate using Gaussian kernels.

:param window: part of global data for finding change points
:return: list of right borders of window change points
:param observation: data points for density estimation
:param bandwidth: smoothing parameter for KDE
:return: density values at evaluation points
"""
raise NotImplementedError
n = observation.shape[0]
x_grid: npt.NDArray[np.float64] = np.linspace(
np.min(observation) - 3*bandwidth,
np.max(observation) + 3*bandwidth,
1000,
dtype=np.float64
)

diff = x_grid[:, np.newaxis] - observation
kernel_vals = np.exp(-0.5 * (diff / bandwidth) ** 2)
kde_vals = kernel_vals.sum(axis=1)

return np.asarray(kde_vals / (n * bandwidth * np.sqrt(2*np.pi)), dtype=np.float64)

@abstractmethod
def localize(self, window: npt.NDArray[np.float64]) -> list[int]:
"""Function for finding coordinates of change points in window
def _compute_scores(self, window: npt.NDArray[np.float64]) -> npt.NDArray[np.float64]:
"""Computes change point scores.

:param window: part of global data for finding change points
:return: list of window change points
:param window: validated input data
:return: array of change point scores
"""
raise NotImplementedError

@staticmethod
def evaluate_detection_accuracy(true_change_points: list[int], detected_change_points: list[int]) -> _TMetrics:
"""Evaluate the accuracy of change point detection.

:param true_change_points: list of true change point indices.
:param detected_change_points: list of detected change point indices.

:return: a dictionary with evaluation metrics (precision, recall, F1 score).
def evaluate_detection_accuracy(
true_change_points: list[int],
detected_change_points: list[int]
) -> _TMetrics:
"""Computes detection performance metrics.

:param true_change_points: ground truth change points
:param detected_change_points: algorithm-detected change points
:return: dictionary containing precision, recall, F1, and error counts
"""
true_positive = len(set(true_change_points) & set(detected_change_points))
false_positive = len(set(detected_change_points) - set(true_change_points))
false_negative = len(set(true_change_points) - set(detected_change_points))

precision = true_positive / (true_positive + false_positive) if true_positive + false_positive > 0 else 0.0
recall = true_positive / (true_positive + false_negative) if true_positive + false_negative > 0 else 0.0
f1_score = (2 * precision * recall / (precision + recall)) if (precision + recall) > 0 else 0.0
true_positives = len(set(true_change_points) & set(detected_change_points))
false_positives = len(set(detected_change_points) - set(true_change_points))
false_negatives = len(set(true_change_points) - set(detected_change_points))

precision = (
true_positives / (true_positives + false_positives)
if (true_positives + false_positives) > 0 else 0.0
)
recall = (
true_positives / (true_positives + false_negatives)
if (true_positives + false_negatives) > 0 else 0.0
)
f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0.0

return {
"precision": precision,
"recall": recall,
"f1_score": f1_score,
"true_positive": true_positive,
"false_positive": false_positive,
"false_negative": false_negative,
"f1_score": f1,
"true_positive": true_positives,
"false_positive": false_positives,
"false_negative": false_negatives,
}
Loading