-
Notifications
You must be signed in to change notification settings - Fork 0
feat: preprocessing module #87
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: refactor/rework-arch
Are you sure you want to change the base?
Conversation
| bmin = 20 | ||
| bmax = 150 | ||
|
|
||
| h = 1 * iqr(X) * n ** (-1 / 3) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be h = 2 * IQR * n^(-1/3) straight to original Freedman-Diaconis rule, isn't it?
VITYANA
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add more dock strings, it's hard at least for me to understand some parts of ur code now 😇
| def _get_peaks(hist: np.ndarray, is_valleys: bool = False) -> list[np.ndarray]: | ||
| hist_prep = np.concatenate((np.zeros(1), hist, np.zeros(1))) | ||
| if not is_valleys: | ||
| peaks, _ = find_peaks(hist_prep) | ||
| return [hist_prep, peaks] | ||
|
|
||
| valleys, _ = find_peaks(-hist_prep) | ||
| return [hist_prep, valleys] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| def _get_peaks(hist: np.ndarray, is_valleys: bool = False) -> list[np.ndarray]: | |
| hist_prep = np.concatenate((np.zeros(1), hist, np.zeros(1))) | |
| if not is_valleys: | |
| peaks, _ = find_peaks(hist_prep) | |
| return [hist_prep, peaks] | |
| valleys, _ = find_peaks(-hist_prep) | |
| return [hist_prep, valleys] | |
| def _get_peaks(hist: np.ndarray, is_valleys: bool = False) -> tuple[np.ndarray, np.ndarray]: | |
| hist_prep = np.concatenate((np.zeros(1), hist, np.zeros(1))) | |
| data_to_search = -hist_prep if is_valleys else hist_prep | |
| indices, _ = find_peaks(data_to_search) | |
| return (hist_prep, indices) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hist = hist / np.sum(hist) repeat too much times, maybe we can fix it somehow, by adding a decorator or smth.
Also I think it will be good to add some dockstrings
| class CSobelCount(AHistClassifierCriterion): | ||
| def __init__(self, threshold: float = 0.01) -> None: | ||
| self.threshold = threshold | ||
|
|
||
| @property | ||
| def name(self) -> str: | ||
| return "Sobel Count Criterion" | ||
|
|
||
| def score(self, hist: np.ndarray) -> float: | ||
| hist = hist / np.sum(hist) | ||
|
|
||
| sob = sobel(hist) | ||
| return np.mean(np.abs(sob) > np.max(np.abs(sob)) * self.threshold) | ||
|
|
||
|
|
||
| class CSobelMax(AHistClassifierCriterion): | ||
| def __init__(self, threshold: float = 0.01) -> None: | ||
| self.threshold = threshold | ||
|
|
||
| @property | ||
| def name(self) -> str: | ||
| return "Sobel Max Criterion" | ||
|
|
||
| def score(self, hist: np.ndarray) -> float: | ||
| hist = hist / np.sum(hist) | ||
|
|
||
| sob = sobel(hist) | ||
| return np.max(np.abs(sob)) | ||
|
|
||
|
|
||
| class CSobelMean(AHistClassifierCriterion): | ||
| def __init__(self, threshold: float = 0.01) -> None: | ||
| self.threshold = threshold | ||
|
|
||
| @property | ||
| def name(self) -> str: | ||
| return "Sobel Mean Criterion" | ||
|
|
||
| def score(self, hist: np.ndarray) -> float: | ||
| hist = hist / np.sum(hist) | ||
|
|
||
| sob = sobel(hist) | ||
| return np.mean(np.abs(sob)) | ||
|
|
||
|
|
||
| class CSobelMin(AHistClassifierCriterion): | ||
| def __init__(self, threshold: float = 0.01) -> None: | ||
| self.threshold = threshold | ||
|
|
||
| @property | ||
| def name(self) -> str: | ||
| return "Sobel Min Criterion" | ||
|
|
||
| def score(self, hist: np.ndarray) -> float: | ||
| hist = hist / np.sum(hist) | ||
|
|
||
| sob = sobel(hist) | ||
| return np.min(np.abs(sob)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
threshold only used in CSobelCount, sob = sobel(hist) duplicated, maybe make a general class CSobelFeature and give to it a agg_mode: str = "mean" or something like it, so then CSobelMean() becomes CSobelFeature(agg_mode="mean"), CSobelMax() becomes CSobelFeature(agg_mode="max"), CSobelMin() becomes CSobelFeature(agg_mode="min"), CSobelCount() becomes CSobelFeature(agg_mode="count", count_threshold=0.01)
Anyway, delete threshold
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
now here a lot of duplicates with peaks vs. valleys that has only two differences in hist, peaks = self._get_peaks(hist) vs. hist, valleys = self._get_peaks(hist, True) and hist vs. -hist usage.
there's one of solutions, just replace all criteria with general class with feature_type, agg_mode, peak_type parameters:
class CPeakFeature(APeaksClassifierCriterion):
def __init__(
self,
feature_type: str,
agg_mode: str,
peak_type: str = "peak",
) -> None:
if feature_type not in ["height", "distance", "width"]:
raise ValueError("feature_type must be 'height', 'distance', or 'width'")
if agg_mode not in ["max", "mean", "min", "count"]:
raise ValueError("agg_mode must be 'max', 'mean', 'min', or 'count'")
if peak_type not in ["peak", "valley"]:
raise ValueError("peak_type must be 'peak' or 'valley'")
self.feature_type = feature_type
self.agg_mode = agg_mode
self.peak_type = peak_type
self.is_valleys = (peak_type == "valley")
self._agg_funcs = {"max": np.max, "mean": np.mean, "min": np.min}
@property
def name(self) -> str:
return f"{self.peak_type.capitalize()} {self.feature_type.capitalize()} {self.agg_mode.capitalize()} Criterion"
def score(self, hist: np.ndarray) -> float:
hist_prep, indices = self._get_peaks(hist, self.is_valleys)
if len(indices) == 0:
return 0.0
if self.feature_type != "height" and len(indices) <= 1:
return 0.0
if self.feature_type == "height":
hist_sum = np.sum(hist_prep)
if hist_sum == 0: return 0.0
values = hist_prep[indices] / hist_sum
elif self.feature_type == "distance":
values = np.abs(np.diff(indices)) - 1
elif self.feature_type == "width":
hist_for_width = -hist_prep if self.is_valleys else hist_prep
values, _, _, _ = peak_widths(hist_for_width, indices)
if self.agg_mode == "count":
return len(indices)
agg_func = self._agg_funcs[self.agg_mode]
result = agg_func(values)
if self.feature_type in ["distance", "width"]:
return result / len(hist)
return result
Otherwise, you can think of something else
| class CBootKurt(ASampleClassifierCriterion): | ||
| def __init__(self, n_boot: int = 200, state: int | None = None) -> None: | ||
| self.n_boot = n_boot | ||
| self.state = state | ||
|
|
||
| @property | ||
| def name(self) -> str: | ||
| return "Bootstrap Kurtosis Criterion" | ||
|
|
||
| def score(self, X: np.ndarray) -> float: | ||
| np.random.seed(self.state) | ||
|
|
||
| n = len(X) | ||
| means = [kurtosis(np.random.choice(X, size=n, replace=True)) for _ in range(self.n_boot)] | ||
| result = np.var(means) | ||
| return result if not np.isnan(result) else 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
np.random.seed(self.state) sets the global random seed for NumPy. This means that if you run CBootKurt.score(), it will affect the outcome of every other random number generation in your entire program that uses np.random.
| class CBootKurt(ASampleClassifierCriterion): | |
| def __init__(self, n_boot: int = 200, state: int | None = None) -> None: | |
| self.n_boot = n_boot | |
| self.state = state | |
| @property | |
| def name(self) -> str: | |
| return "Bootstrap Kurtosis Criterion" | |
| def score(self, X: np.ndarray) -> float: | |
| np.random.seed(self.state) | |
| n = len(X) | |
| means = [kurtosis(np.random.choice(X, size=n, replace=True)) for _ in range(self.n_boot)] | |
| result = np.var(means) | |
| return result if not np.isnan(result) else 0 | |
| from numpy.random import Generator, PCG64 | |
| class CBootKurt(ASampleClassifierCriterion): | |
| def __init__(self, n_boot: int = 200, state: int | None = None) -> None: | |
| self.n_boot = n_boot | |
| self.rng = Generator(PCG64(state)) | |
| @property | |
| def name(self) -> str: | |
| return "Bootstrap Kurtosis Criterion" | |
| def score(self, X: np.ndarray) -> float: | |
| n = len(X) | |
| means = [kurtosis(self.rng.choice(X, size=n, replace=True)) for _ in range(self.n_boot)] | |
| result = np.var(means) | |
| return result if not np.isnan(result) else 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Silhouette class almost duplicates Elbow, u supposed to create a new abstract base class that encapsulates the shared KMeans logic.
| def __init__( | ||
| self, | ||
| kmax: int, | ||
| k_init: int = 1, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
k_init: int = 1 is too poor for KMeans
| def name(self) -> str: | ||
| return "Elbow" | ||
|
|
||
| def estimate(self, X: np.ndarray) -> int: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The KneeLocator is not guaranteed to find an elbow. If it fails, knee_locator.elbow will be None.
| def estimate(self, X: np.ndarray) -> int: | |
| def estimate(self, X: np.ndarray) -> int | None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or make safe return to avoid ValueError, -1 for example
| def __init__( | ||
| self, | ||
| kmax: int, | ||
| k_init: int = 1, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
k_init: int = 1 is too poor for KMeans
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-
The base_criterions list is the definition of "hardcoded configuration". Make something like factory function that will receive list of criteria to be used for example, and make a config file, that will contain all criteria to be used in this factory function. Now if u wanna add/delete criteria from your model u need to add/comment row from this file.
-
This imports looks kinda weird, but I don't know how to fix it, maybe u can use
CRITERION_REGISTRY = {}and then write a decorator that will add criteria to this dict and then parse it from config file from "1." point.
But for sure this might be a not good idea
Realisation preprocessing module: