Skip to content

Commit 01ade80

Browse files
committed
Fix: fixed non-strict typing errors
1 parent 419380c commit 01ade80

File tree

7 files changed

+62
-55
lines changed

7 files changed

+62
-55
lines changed

pysatl_cpd/core/algorithms/entropies/KLDivergence_entropy.py

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -102,8 +102,8 @@ def _calculate_kl_divergence(self) -> float:
102102
def _calculate_kl_divergence_histogram(
103103
self, ref_data: npt.NDArray[np.float64], curr_data: npt.NDArray[np.float64]
104104
) -> float:
105-
data_min = min(np.min(ref_data), np.min(curr_data))
106-
data_max = max(np.max(ref_data), np.max(curr_data))
105+
data_min = float(np.min(ref_data))
106+
data_max = float(np.max(ref_data))
107107

108108
margin = (data_max - data_min) * 0.01
109109
bin_edges = np.linspace(data_min - margin, data_max + margin, self._num_bins + 1)
@@ -120,39 +120,39 @@ def _calculate_kl_divergence_histogram(
120120
ref_prob = ref_prob / np.sum(ref_prob)
121121
curr_prob = curr_prob / np.sum(curr_prob)
122122

123-
kl_pq = np.sum(ref_prob * np.log(ref_prob / curr_prob))
123+
kl_pq = float(np.sum(ref_prob * np.log(ref_prob / curr_prob)))
124124

125125
if self._symmetric:
126-
kl_qp = np.sum(curr_prob * np.log(curr_prob / ref_prob))
126+
kl_qp = float(np.sum(curr_prob * np.log(curr_prob / ref_prob)))
127127
return (kl_pq + kl_qp) / 2
128128
else:
129129
return kl_pq
130130

131131
def _calculate_kl_divergence_kde(
132132
self, ref_data: npt.NDArray[np.float64], curr_data: npt.NDArray[np.float64]
133133
) -> float:
134-
ref_kde = stats.gaussian_kde(ref_data)
135-
curr_kde = stats.gaussian_kde(curr_data)
136-
137-
data_min = min(np.min(ref_data), np.min(curr_data))
138-
data_max = max(np.max(ref_data), np.max(curr_data))
134+
data_min = float(np.min(np.array([ref_data.min(), curr_data.min()])))
135+
data_max = float(np.max(np.array([ref_data.max(), curr_data.max()])))
139136
margin = (data_max - data_min) * 0.1
140137
x_eval = np.linspace(data_min - margin, data_max + margin, 1000)
141138

139+
ref_kde = stats.gaussian_kde(ref_data)
140+
curr_kde = stats.gaussian_kde(curr_data)
141+
142142
ref_density = ref_kde(x_eval)
143143
curr_density = curr_kde(x_eval)
144144

145-
ref_density = ref_density + self._smoothing_factor
146-
curr_density = curr_density + self._smoothing_factor
145+
ref_density += self._smoothing_factor
146+
curr_density += self._smoothing_factor
147147

148148
dx = x_eval[1] - x_eval[0]
149-
ref_density = ref_density / (np.sum(ref_density) * dx)
150-
curr_density = curr_density / (np.sum(curr_density) * dx)
149+
ref_density /= np.sum(ref_density) * dx
150+
curr_density /= np.sum(curr_density) * dx
151151

152-
kl_pq = np.sum(ref_density * np.log(ref_density / curr_density)) * dx
152+
kl_pq = float(np.sum(ref_density * np.log(ref_density / curr_density)) * dx)
153153

154154
if self._symmetric:
155-
kl_qp = np.sum(curr_density * np.log(curr_density / ref_density)) * dx
155+
kl_qp = float(np.sum(curr_density * np.log(curr_density / ref_density)) * dx)
156156
return (kl_pq + kl_qp) / 2
157157
else:
158158
return kl_pq
@@ -166,7 +166,7 @@ def _update_reference_distribution(self) -> None:
166166
def get_kl_history(self) -> list[float]:
167167
return self._kl_values.copy()
168168

169-
def get_current_parameters(self) -> dict:
169+
def get_current_parameters(self) -> dict[str, float | int | bool]:
170170
return {
171171
"window_size": self._window_size,
172172
"reference_window_size": self._reference_window_size,
@@ -200,7 +200,7 @@ def set_parameters(
200200
if smoothing_factor is not None:
201201
self._smoothing_factor = smoothing_factor
202202

203-
def get_distribution_comparison(self) -> dict:
203+
def get_distribution_comparison(self) -> dict[str, float]:
204204
if len(self._reference_buffer) < self._reference_window_size or len(self._current_buffer) < self._window_size:
205205
return {}
206206

@@ -225,7 +225,7 @@ def get_distribution_comparison(self) -> dict:
225225
"ks_pvalue": ks_pvalue,
226226
}
227227

228-
def analyze_distributions(self) -> dict:
228+
def analyze_distributions(self) -> dict[str, float]:
229229
if len(self._reference_buffer) < self._reference_window_size or len(self._current_buffer) < self._window_size:
230230
return {}
231231

pysatl_cpd/core/algorithms/entropies/approximate_entropy.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,11 +85,13 @@ def _calculate_approximate_entropy(self, time_series: npt.NDArray[np.float64]) -
8585

8686
r = self._r
8787
if r is None:
88-
std_dev = np.std(time_series)
88+
std_dev = float(np.std(time_series))
8989
if std_dev == 0:
9090
return 0.0
9191
r = self._r_factor * std_dev
9292

93+
assert r is not None
94+
9395
phi_m = self._calculate_phi(time_series, self._m, r)
9496
phi_m_plus_1 = self._calculate_phi(time_series, self._m + 1, r)
9597

pysatl_cpd/core/algorithms/entropies/dispersion_entropy.py

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from collections import deque
2-
from typing import Optional
2+
from typing import Optional, Union
33

44
import numpy as np
55
import numpy.typing as npt
@@ -125,7 +125,7 @@ def _discretize_to_classes(self, y_series: npt.NDArray[np.float64]) -> npt.NDArr
125125
z_series = np.clip(z_series, 1, self._num_classes)
126126
return z_series
127127

128-
def _create_dispersion_patterns(self, z_series: npt.NDArray[np.int32]) -> list[tuple]:
128+
def _create_dispersion_patterns(self, z_series: npt.NDArray[np.int32]) -> list[tuple[int, ...]]:
129129
N = len(z_series)
130130
patterns = []
131131

@@ -140,22 +140,22 @@ def _create_dispersion_patterns(self, z_series: npt.NDArray[np.int32]) -> list[t
140140

141141
return patterns
142142

143-
def _calculate_pattern_probabilities(self, patterns: list[tuple]) -> dict[tuple, float]:
143+
def _calculate_pattern_probabilities(self, patterns: list[tuple[int, ...]]) -> dict[tuple[int, ...], float]:
144144
if not patterns:
145145
return {}
146146

147-
pattern_counts = {}
147+
pattern_counts: dict[tuple[int, ...], int] = {}
148148
for pattern in patterns:
149149
pattern_counts[pattern] = pattern_counts.get(pattern, 0) + 1
150150

151151
total_patterns = len(patterns)
152-
pattern_probs = {}
152+
pattern_probs: dict[tuple[int, ...], float] = {}
153153
for pattern, count in pattern_counts.items():
154154
pattern_probs[pattern] = count / total_patterns
155155

156156
return pattern_probs
157157

158-
def _calculate_shannon_entropy(self, pattern_probs: dict[tuple, float]) -> float:
158+
def _calculate_shannon_entropy(self, pattern_probs: dict[tuple[int, ...], float]) -> float:
159159
if not pattern_probs:
160160
return 0.0
161161

@@ -169,7 +169,7 @@ def _calculate_shannon_entropy(self, pattern_probs: dict[tuple, float]) -> float
169169
def get_entropy_history(self) -> list[float]:
170170
return self._entropy_values.copy()
171171

172-
def get_current_parameters(self) -> dict:
172+
def get_current_parameters(self) -> dict[str, int | float | bool]:
173173
return {
174174
"window_size": self._window_size,
175175
"embedding_dim": self._embedding_dim,
@@ -204,7 +204,7 @@ def set_parameters(
204204
f"c^w ({self._num_classes}^{self._embedding_dim}) should be less than window_size ({self._window_size})"
205205
)
206206

207-
def get_pattern_distribution(self) -> dict[tuple, int]:
207+
def get_pattern_distribution(self) -> dict[tuple[int, ...], int]:
208208
if len(self._buffer) < self._window_size:
209209
return {}
210210

@@ -218,13 +218,13 @@ def get_pattern_distribution(self) -> dict[tuple, int]:
218218
z_series = self._discretize_to_classes(y_series)
219219
patterns = self._create_dispersion_patterns(z_series)
220220

221-
pattern_counts = {}
221+
pattern_counts: dict[tuple[int, ...], int] = {}
222222
for pattern in patterns:
223223
pattern_counts[pattern] = pattern_counts.get(pattern, 0) + 1
224224

225225
return pattern_counts
226226

227-
def analyze_complexity(self) -> dict:
227+
def analyze_complexity(self) -> dict[str, Union[float, int]]:
228228
if len(self._buffer) < self._window_size:
229229
return {}
230230

pysatl_cpd/core/algorithms/entropies/renyi_entropy.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,8 @@ def _compute_probabilities(self, time_series: npt.NDArray[np.float64]) -> list[f
112112

113113
probabilities = []
114114
for i in range(1, len(bin_edges)):
115-
count = bin_counts.get(i, 0)
115+
key = np.int64(i)
116+
count = bin_counts.get(key, 0)
116117
prob = count / total_count if total_count > 0 else 0.0
117118
probabilities.append(prob)
118119

pysatl_cpd/core/algorithms/entropies/sample_entropy.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -95,21 +95,20 @@ def _calculate_sample_entropy(self, time_series: npt.NDArray[np.float64]) -> flo
9595

9696
r = self._r
9797
if r is None:
98-
std_dev = np.std(time_series)
98+
std_dev = float(np.std(time_series))
9999
if std_dev == 0:
100100
return float("inf")
101101
r = self._r_factor * std_dev
102102

103+
assert r is not None
104+
103105
B = self._count_matches(time_series, self._m, r)
104106
A = self._count_matches(time_series, self._m + 1, r)
105107

106-
if B == 0:
107-
return float("inf")
108-
if A == 0:
108+
if B == 0 or A == 0:
109109
return float("inf")
110110

111-
sample_entropy = -np.log(A / B)
112-
return float(sample_entropy)
111+
return float(-np.log(A / B))
113112

114113
def _count_matches(self, time_series: npt.NDArray[np.float64], m: int, r: float) -> int:
115114
N = len(time_series)

pysatl_cpd/core/algorithms/entropies/slope_entropy.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from collections import deque
2-
from typing import Optional
2+
from typing import Any, Optional, Union
33

44
import numpy as np
55
import numpy.typing as npt
@@ -141,7 +141,7 @@ def _create_slope_pattern(self, subsequence: npt.NDArray[np.float64]) -> list[in
141141
def get_entropy_history(self) -> list[float]:
142142
return self._entropy_values.copy()
143143

144-
def get_current_parameters(self) -> dict:
144+
def get_current_parameters(self) -> dict[str, Any]:
145145
return {
146146
"window_size": self._window_size,
147147
"embedding_dim": self._embedding_dim,
@@ -196,7 +196,7 @@ def get_pattern_distribution(self) -> dict[tuple[int, ...], float]:
196196

197197
return pattern_probs
198198

199-
def analyze_slope_characteristics(self) -> dict:
199+
def analyze_slope_characteristics(self) -> dict[str, Any]:
200200
if len(self._buffer) < self._window_size:
201201
return {}
202202

@@ -232,7 +232,9 @@ def get_symbol_meanings(self) -> dict[int, str]:
232232
-2: f"Steep negative slope (< -{self._gamma})",
233233
}
234234

235-
def demonstrate_encoding(self, sample_data: list[float]) -> dict:
235+
def demonstrate_encoding(
236+
self, sample_data: list[float]
237+
) -> dict[str, Union[str, float, int, list[float], list[int], list[list[int]], dict[int, str]]]:
236238
if len(sample_data) < self._embedding_dim:
237239
return {"error": "Sample data too short"}
238240

pysatl_cpd/core/algorithms/entropies/tsallis_entropy.py

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
from collections import deque
2-
from typing import Optional
2+
from typing import Any, Optional, Union
33

44
import numpy as np
55
import numpy.typing as npt
6+
from numpy.typing import NDArray
67
from scipy import stats
78
from scipy.integrate import quad
89

@@ -143,19 +144,21 @@ def _calculate_continuous_tsallis_entropy(self, time_series: npt.NDArray[np.floa
143144
data_min, data_max = np.min(time_series), np.max(time_series)
144145
margin = (data_max - data_min) * 0.2
145146

146-
def integrand(x):
147-
p_x = kde(x)[0] if np.isscalar(x) else kde(x)
148-
return p_x**q
147+
def integrand(x: Union[float, NDArray[np.float64]]) -> float:
148+
x_array: NDArray[np.float64] = np.atleast_1d(x).astype(np.float64)
149+
p_x = kde(x_array)[0]
150+
return float(p_x**q)
149151

150152
integral_result, _ = quad(
151153
integrand, data_min - margin, data_max + margin, limit=100, epsabs=1e-8, epsrel=1e-8
152154
)
153155
v = 1e-10
154156
if abs(q - 1.0) < v:
155157

156-
def shannon_integrand(x):
157-
p_x = kde(x)[0] if np.isscalar(x) else kde(x)
158-
return p_x * np.log(p_x + 1e-10)
158+
def shannon_integrand(x: Union[float, NDArray[np.float64]]) -> float:
159+
x_array: NDArray[np.float64] = np.atleast_1d(x).astype(np.float64)
160+
p_x = kde(x_array)[0]
161+
return float(p_x * np.log(p_x + 1e-10))
159162

160163
shannon_integral, _ = quad(
161164
shannon_integrand, data_min - margin, data_max + margin, limit=100, epsabs=1e-8, epsrel=1e-8
@@ -180,9 +183,9 @@ def _calculate_max_discrete_tsallis_entropy(self, n_states: int, q: float) -> fl
180183
p_uniform = 1.0 / n_states
181184

182185
if abs(q - 1.0) < v:
183-
return self._k_constant * np.log(n_states)
186+
return float(self._k_constant * np.log(n_states))
184187
else:
185-
return self._k_constant * (1.0 / (q - 1.0)) * (1.0 - n_states * (p_uniform**q))
188+
return float(self._k_constant * (1.0 / (q - 1.0)) * (1.0 - n_states * (p_uniform**q)))
186189

187190
def _combine_multi_q_entropies(self, entropies: dict[float, float]) -> float:
188191
weights = {}
@@ -245,7 +248,7 @@ def get_entropy_history(self) -> list[float]:
245248
def get_multi_q_history(self) -> dict[float, list[float]]:
246249
return {q: values.copy() for q, values in self._multi_entropy_values.items()}
247250

248-
def get_current_parameters(self) -> dict:
251+
def get_current_parameters(self) -> dict[str, Any]:
249252
return {
250253
"window_size": self._window_size,
251254
"q_parameter": self._q_parameter,
@@ -268,20 +271,20 @@ def set_parameters(
268271
normalize: Optional[bool] = None,
269272
multi_q: Optional[bool] = None,
270273
) -> None:
271-
def set_q_param(q):
274+
def set_q_param(q: float) -> None:
272275
v = 1e-10
273276
if abs(q - 1.0) < v:
274277
raise ValueError("q parameter cannot be 1")
275278
self._q_parameter = q
276279
if not self._multi_q:
277280
self._q_values = [q]
278281

279-
def set_k_const(k):
282+
def set_k_const(k: float) -> None:
280283
if k <= 0:
281284
raise ValueError("k constant must be positive")
282285
self._k_constant = k
283286

284-
def set_num_bins_func(bins):
287+
def set_num_bins_func(bins: int) -> None:
285288
if bins <= 1:
286289
raise ValueError("Number of bins must be greater than 1")
287290
self._num_bins = bins
@@ -312,7 +315,7 @@ def set_num_bins_func(bins):
312315
else:
313316
self._q_values = [self._q_parameter]
314317

315-
def analyze_q_sensitivity(self) -> dict:
318+
def analyze_q_sensitivity(self) -> dict[str, Any]:
316319
if len(self._buffer) < self._window_size:
317320
return {}
318321

@@ -341,7 +344,7 @@ def analyze_q_sensitivity(self) -> dict:
341344
"entropy_variance_across_q": np.var(list(q_entropies.values())) if q_entropies else 0,
342345
}
343346

344-
def get_complexity_metrics(self) -> dict:
347+
def get_complexity_metrics(self) -> dict[str, Any]:
345348
if len(self._buffer) < self._window_size:
346349
return {}
347350

0 commit comments

Comments
 (0)