Skip to content

Commit 419380c

Browse files
committed
New methods of entropy analysis have been added: shannon, bubble, condition, permutation, renyi, tsallis, KLD, dispersion, approximate, sample, slope. Tests have been developed and added to them.
1 parent 9f496f4 commit 419380c

25 files changed

+7243
-35
lines changed
Lines changed: 264 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,264 @@
1+
from collections import deque
2+
from typing import Optional
3+
4+
import numpy as np
5+
import numpy.typing as npt
6+
from scipy import stats
7+
8+
from pysatl_cpd.core.algorithms.online_algorithm import OnlineAlgorithm
9+
10+
11+
class KLDivergenceAlgorithm(OnlineAlgorithm):
12+
def __init__(
13+
self,
14+
window_size: int = 100,
15+
reference_window_size: Optional[int] = None,
16+
threshold: float = 0.5,
17+
num_bins: int = 20,
18+
use_kde: bool = False,
19+
symmetric: bool = True,
20+
smoothing_factor: float = 1e-10,
21+
):
22+
self._window_size = window_size
23+
self._reference_window_size = reference_window_size or window_size
24+
self._threshold = threshold
25+
self._num_bins = num_bins
26+
self._use_kde = use_kde
27+
self._symmetric = symmetric
28+
self._smoothing_factor = smoothing_factor
29+
30+
if window_size <= 0 or self._reference_window_size <= 0:
31+
raise ValueError("Window sizes must be positive")
32+
if num_bins <= 1:
33+
raise ValueError("Number of bins must be greater than 1")
34+
if threshold <= 0:
35+
raise ValueError("Threshold must be positive")
36+
37+
self._reference_buffer: deque[float] = deque(maxlen=self._reference_window_size)
38+
self._current_buffer: deque[float] = deque(maxlen=self._window_size)
39+
self._kl_values: list[float] = []
40+
self._position: int = 0
41+
self._last_change_point: Optional[int] = None
42+
self._reference_updated: bool = False
43+
44+
def detect(self, observation: np.float64 | npt.NDArray[np.float64]) -> bool:
45+
if isinstance(observation, np.ndarray):
46+
for obs in observation:
47+
self._process_single_observation(float(obs))
48+
else:
49+
self._process_single_observation(float(observation))
50+
51+
return self._last_change_point is not None
52+
53+
def localize(self, observation: np.float64 | npt.NDArray[np.float64]) -> Optional[int]:
54+
change_detected = self.detect(observation)
55+
56+
if change_detected:
57+
change_point = self._last_change_point
58+
self._last_change_point = None
59+
return change_point
60+
61+
return None
62+
63+
def _process_single_observation(self, observation: float) -> None:
64+
v = 5
65+
self._current_buffer.append(observation)
66+
self._position += 1
67+
68+
if len(self._reference_buffer) < self._reference_window_size:
69+
self._reference_buffer.append(observation)
70+
return
71+
72+
if len(self._current_buffer) < self._window_size:
73+
return
74+
75+
kl_divergence = self._calculate_kl_divergence()
76+
77+
if np.isinf(kl_divergence) or np.isnan(kl_divergence):
78+
kl_divergence = 0.0
79+
80+
self._kl_values.append(kl_divergence)
81+
82+
if kl_divergence > self._threshold:
83+
self._last_change_point = self._position - self._window_size // 2
84+
self._update_reference_distribution()
85+
86+
if len(self._kl_values) >= v:
87+
recent_kl = self._kl_values[-5:]
88+
kl_trend = np.mean(recent_kl)
89+
if kl_trend > self._threshold * 0.8:
90+
self._last_change_point = self._position - self._window_size // 4
91+
self._update_reference_distribution()
92+
93+
def _calculate_kl_divergence(self) -> float:
94+
reference_data = np.array(list(self._reference_buffer))
95+
current_data = np.array(list(self._current_buffer))
96+
97+
if self._use_kde:
98+
return self._calculate_kl_divergence_kde(reference_data, current_data)
99+
else:
100+
return self._calculate_kl_divergence_histogram(reference_data, current_data)
101+
102+
def _calculate_kl_divergence_histogram(
103+
self, ref_data: npt.NDArray[np.float64], curr_data: npt.NDArray[np.float64]
104+
) -> float:
105+
data_min = min(np.min(ref_data), np.min(curr_data))
106+
data_max = max(np.max(ref_data), np.max(curr_data))
107+
108+
margin = (data_max - data_min) * 0.01
109+
bin_edges = np.linspace(data_min - margin, data_max + margin, self._num_bins + 1)
110+
111+
ref_hist, _ = np.histogram(ref_data, bins=bin_edges, density=True)
112+
curr_hist, _ = np.histogram(curr_data, bins=bin_edges, density=True)
113+
114+
ref_prob = ref_hist / np.sum(ref_hist) if np.sum(ref_hist) > 0 else ref_hist
115+
curr_prob = curr_hist / np.sum(curr_hist) if np.sum(curr_hist) > 0 else curr_hist
116+
117+
ref_prob = ref_prob + self._smoothing_factor
118+
curr_prob = curr_prob + self._smoothing_factor
119+
120+
ref_prob = ref_prob / np.sum(ref_prob)
121+
curr_prob = curr_prob / np.sum(curr_prob)
122+
123+
kl_pq = np.sum(ref_prob * np.log(ref_prob / curr_prob))
124+
125+
if self._symmetric:
126+
kl_qp = np.sum(curr_prob * np.log(curr_prob / ref_prob))
127+
return (kl_pq + kl_qp) / 2
128+
else:
129+
return kl_pq
130+
131+
def _calculate_kl_divergence_kde(
132+
self, ref_data: npt.NDArray[np.float64], curr_data: npt.NDArray[np.float64]
133+
) -> float:
134+
ref_kde = stats.gaussian_kde(ref_data)
135+
curr_kde = stats.gaussian_kde(curr_data)
136+
137+
data_min = min(np.min(ref_data), np.min(curr_data))
138+
data_max = max(np.max(ref_data), np.max(curr_data))
139+
margin = (data_max - data_min) * 0.1
140+
x_eval = np.linspace(data_min - margin, data_max + margin, 1000)
141+
142+
ref_density = ref_kde(x_eval)
143+
curr_density = curr_kde(x_eval)
144+
145+
ref_density = ref_density + self._smoothing_factor
146+
curr_density = curr_density + self._smoothing_factor
147+
148+
dx = x_eval[1] - x_eval[0]
149+
ref_density = ref_density / (np.sum(ref_density) * dx)
150+
curr_density = curr_density / (np.sum(curr_density) * dx)
151+
152+
kl_pq = np.sum(ref_density * np.log(ref_density / curr_density)) * dx
153+
154+
if self._symmetric:
155+
kl_qp = np.sum(curr_density * np.log(curr_density / ref_density)) * dx
156+
return (kl_pq + kl_qp) / 2
157+
else:
158+
return kl_pq
159+
160+
def _update_reference_distribution(self) -> None:
161+
self._reference_buffer.clear()
162+
for value in self._current_buffer:
163+
self._reference_buffer.append(value)
164+
self._reference_updated = True
165+
166+
def get_kl_history(self) -> list[float]:
167+
return self._kl_values.copy()
168+
169+
def get_current_parameters(self) -> dict:
170+
return {
171+
"window_size": self._window_size,
172+
"reference_window_size": self._reference_window_size,
173+
"threshold": self._threshold,
174+
"num_bins": self._num_bins,
175+
"use_kde": self._use_kde,
176+
"symmetric": self._symmetric,
177+
"smoothing_factor": self._smoothing_factor,
178+
}
179+
180+
def set_parameters(
181+
self,
182+
threshold: Optional[float] = None,
183+
num_bins: Optional[int] = None,
184+
use_kde: Optional[bool] = None,
185+
symmetric: Optional[bool] = None,
186+
smoothing_factor: Optional[float] = None,
187+
) -> None:
188+
if threshold is not None:
189+
if threshold <= 0:
190+
raise ValueError("Threshold must be positive")
191+
self._threshold = threshold
192+
if num_bins is not None:
193+
if num_bins <= 1:
194+
raise ValueError("Number of bins must be greater than 1")
195+
self._num_bins = num_bins
196+
if use_kde is not None:
197+
self._use_kde = use_kde
198+
if symmetric is not None:
199+
self._symmetric = symmetric
200+
if smoothing_factor is not None:
201+
self._smoothing_factor = smoothing_factor
202+
203+
def get_distribution_comparison(self) -> dict:
204+
if len(self._reference_buffer) < self._reference_window_size or len(self._current_buffer) < self._window_size:
205+
return {}
206+
207+
ref_data = np.array(list(self._reference_buffer))
208+
curr_data = np.array(list(self._current_buffer))
209+
210+
ref_mean, ref_std = np.mean(ref_data), np.std(ref_data)
211+
curr_mean, curr_std = np.mean(curr_data), np.std(curr_data)
212+
213+
kl_div = self._calculate_kl_divergence()
214+
ks_statistic, ks_pvalue = stats.ks_2samp(ref_data, curr_data)
215+
216+
return {
217+
"kl_divergence": kl_div,
218+
"reference_mean": ref_mean,
219+
"reference_std": ref_std,
220+
"current_mean": curr_mean,
221+
"current_std": curr_std,
222+
"mean_difference": abs(curr_mean - ref_mean),
223+
"std_ratio": curr_std / ref_std if ref_std > 0 else float("inf"),
224+
"ks_statistic": ks_statistic,
225+
"ks_pvalue": ks_pvalue,
226+
}
227+
228+
def analyze_distributions(self) -> dict:
229+
if len(self._reference_buffer) < self._reference_window_size or len(self._current_buffer) < self._window_size:
230+
return {}
231+
232+
ref_data = np.array(list(self._reference_buffer))
233+
curr_data = np.array(list(self._current_buffer))
234+
235+
comparison = self.get_distribution_comparison()
236+
237+
ref_entropy = stats.entropy(np.histogram(ref_data, bins=self._num_bins)[0] + self._smoothing_factor)
238+
curr_entropy = stats.entropy(np.histogram(curr_data, bins=self._num_bins)[0] + self._smoothing_factor)
239+
240+
quantiles = [0.25, 0.5, 0.75]
241+
ref_quantiles = np.quantile(ref_data, quantiles)
242+
curr_quantiles = np.quantile(curr_data, quantiles)
243+
244+
return {
245+
**comparison,
246+
"reference_entropy": ref_entropy,
247+
"current_entropy": curr_entropy,
248+
"entropy_difference": abs(curr_entropy - ref_entropy),
249+
"reference_quantiles": ref_quantiles.tolist(),
250+
"current_quantiles": curr_quantiles.tolist(),
251+
"quantile_differences": (np.abs(curr_quantiles - ref_quantiles)).tolist(),
252+
}
253+
254+
def reset(self) -> None:
255+
self._reference_buffer.clear()
256+
self._current_buffer.clear()
257+
self._kl_values.clear()
258+
self._position = 0
259+
self._last_change_point = None
260+
self._reference_updated = False
261+
262+
def force_reference_update(self) -> None:
263+
if len(self._current_buffer) >= self._window_size:
264+
self._update_reference_distribution()

pysatl_cpd/core/algorithms/entropies/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)