diff --git a/aeon/transformations/collection/dictionary_based/_paa.py b/aeon/transformations/collection/dictionary_based/_paa.py index 2cba574a1c..bf5d461e5c 100644 --- a/aeon/transformations/collection/dictionary_based/_paa.py +++ b/aeon/transformations/collection/dictionary_based/_paa.py @@ -3,8 +3,10 @@ __maintainer__ = [] import numpy as np +from numba import get_num_threads, njit, prange, set_num_threads from aeon.transformations.collection import BaseCollectionTransformer +from aeon.utils.validation import check_n_jobs class PAA(BaseCollectionTransformer): @@ -39,12 +41,14 @@ class PAA(BaseCollectionTransformer): _tags = { "capability:multivariate": True, + "capability:multithreading": True, "fit_is_empty": True, "algorithm_type": "dictionary", } - def __init__(self, n_segments=8): + def __init__(self, n_segments=8, n_jobs=1): self.n_segments = n_segments + self.n_jobs = n_jobs super().__init__() @@ -71,7 +75,6 @@ def _transform(self, X, y=None): # of segments is 3, the indices will be [0:3], [3:6] and [6:10] # so 3 segments, two of length 3 and one of length 4 split_segments = np.array_split(all_indices, self.n_segments) - # If the series length is divisible by the number of segments # then the transformation can be done in one line # If not, a for loop is needed only on the segments while @@ -82,13 +85,13 @@ def _transform(self, X, y=None): return X_paa else: - n_samples, n_channels, _ = X.shape - X_paa = np.zeros(shape=(n_samples, n_channels, self.n_segments)) - - for _s, segment in enumerate(split_segments): - if X[:, :, segment].shape[-1] > 0: # avoids mean of empty slice error - X_paa[:, :, _s] = X[:, :, segment].mean(axis=-1) - + prev_threads = get_num_threads() + _n_jobs = check_n_jobs(self.n_jobs) + set_num_threads(_n_jobs) + X_paa = _parallel_paa_transform( + X, n_segments=self.n_segments, split_segments=split_segments + ) + set_num_threads(prev_threads) return X_paa def inverse_paa(self, X, original_length): @@ -110,17 +113,17 @@ def inverse_paa(self, X, original_length): return np.repeat(X, repeats=int(original_length / self.n_segments), axis=-1) else: - n_samples, n_channels, _ = X.shape - X_inverse_paa = np.zeros(shape=(n_samples, n_channels, original_length)) - - all_indices = np.arange(original_length) - split_segments = np.array_split(all_indices, self.n_segments) - - for _s, segment in enumerate(split_segments): - X_inverse_paa[:, :, segment] = np.repeat( - X[:, :, [_s]], repeats=len(segment), axis=-1 - ) - + split_segments = np.array_split(np.arange(original_length), self.n_segments) + prev_threads = get_num_threads() + _n_jobs = check_n_jobs(self.n_jobs) + set_num_threads(_n_jobs) + X_inverse_paa = _parallel_inverse_paa_transform( + X, + original_length=original_length, + n_segments=self.n_segments, + split_segments=split_segments, + ) + set_num_threads(prev_threads) return X_inverse_paa @classmethod @@ -143,3 +146,40 @@ def _get_test_params(cls, parameter_set="default"): """ params = {"n_segments": 10} return params + + +@njit(parallel=True, cache=True, fastmath=True) +def _parallel_paa_transform(X, n_segments, split_segments): + """Parallelized PAA for uneven segment splits using Numba.""" + n_samples, n_channels, _ = X.shape + X_paa = np.zeros((n_samples, n_channels, n_segments), dtype=X.dtype) + + for _s in prange(n_segments): # Parallel over segments + segment = split_segments[_s] + seg_len = segment.shape[0] + + if seg_len == 0: + continue # skip empty segment + + for i in range(n_samples): + for j in range(n_channels): + X_paa[i, j, _s] = X[i, j, segment].mean() + + return X_paa + + +@njit(parallel=True, cache=True, fastmath=True) +def _parallel_inverse_paa_transform(X, original_length, n_segments, split_segments): + """Parallelize inverse PAA when series len % segments ≠ 0.""" + n_samples, n_channels, _ = X.shape + X_inverse_paa = np.zeros(shape=(n_samples, n_channels, original_length)) + + for _s in prange(n_segments): + segment = split_segments[_s] + for idx in prange(len(segment)): + t = segment[idx] + for i in prange(n_samples): + for j in prange(n_channels): + X_inverse_paa[i, j, t] = X[i, j, _s] + + return X_inverse_paa diff --git a/aeon/transformations/collection/dictionary_based/_sax.py b/aeon/transformations/collection/dictionary_based/_sax.py index 8200f804ad..37319123ac 100644 --- a/aeon/transformations/collection/dictionary_based/_sax.py +++ b/aeon/transformations/collection/dictionary_based/_sax.py @@ -167,7 +167,11 @@ def _get_sax_symbols(self, X_paa): sax_symbols : np.ndarray of shape = (n_cases, n_channels, n_segments) The output of the SAX transformation using np.digitize """ - sax_symbols = np.digitize(x=X_paa, bins=self.breakpoints) + prev_threads = get_num_threads() + _n_jobs = check_n_jobs(self.n_jobs) + set_num_threads(_n_jobs) + sax_symbols = _parallel_get_sax_symbols(X_paa, breakpoints=self.breakpoints) + set_num_threads(prev_threads) return sax_symbols def inverse_sax(self, X, original_length, y=None): @@ -292,3 +296,16 @@ def _invert_sax_symbols(sax_symbols, n_timepoints, breakpoints_mid): ] return sax_inverse + + +@njit(fastmath=True, cache=True, parallel=True) +def _parallel_get_sax_symbols(x, breakpoints, right=False): + """Parallel version using np.digitize within prange loop.""" + n_samples, n_channels, n_segments = x.shape + result = np.empty_like(x, dtype=np.intp) + + for i in prange(n_samples): + for c in range(n_channels): + result[i, c, :] = np.digitize(x[i, c, :], breakpoints, right=right) + + return result