diff --git a/.gitignore b/.gitignore index e90b739..3d624ce 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ .pre-commit-config.yaml +trufflehog/ diff --git a/src/spark_calibration/betacal.py b/src/spark_calibration/betacal.py index ffb5e34..471ed24 100644 --- a/src/spark_calibration/betacal.py +++ b/src/spark_calibration/betacal.py @@ -65,7 +65,13 @@ def _log_expr(self, col: F.Column) -> F.Column: """Numerically stable log transformation.""" return F.log(F.when(col < self.EPSILON, self.EPSILON).otherwise(col)) - def _validate_input_df(self, df: DataFrame, score_col: str, label_col: str) -> None: + def _validate_input_df( + self, + df: DataFrame, + score_col: str, + label_col: str, + weight_col: Optional[str] = None, + ) -> None: """ Validate input DataFrame and required columns. @@ -73,6 +79,7 @@ def _validate_input_df(self, df: DataFrame, score_col: str, label_col: str) -> N df (DataFrame): Input dataframe. score_col (str): Column containing raw model scores. label_col (str): Column containing binary labels. + weight_col (str, optional): Column containing sample weights. Raises: ValueError: If DataFrame is empty or required columns are missing. @@ -84,13 +91,21 @@ def _validate_input_df(self, df: DataFrame, score_col: str, label_col: str) -> N score_col in df.columns and label_col in df.columns ), f"Columns {score_col} and {label_col} must be present." - def _handle_null_values(self, df: DataFrame, score_col: str) -> DataFrame: + if weight_col is not None: + assert ( + weight_col in df.columns + ), f"Column {weight_col} must be present when weight_col is provided." + + def _handle_null_values( + self, df: DataFrame, score_col: str, weight_col: Optional[str] = None + ) -> DataFrame: """ - Handle null values in the score column. + Handle null values in the score column (and weight column if provided). Args: df (DataFrame): Input dataframe. score_col (str): Column containing raw model scores. + weight_col (str, optional): Column containing sample weights. Returns: DataFrame: Cleaned DataFrame with null values removed. @@ -99,23 +114,31 @@ def _handle_null_values(self, df: DataFrame, score_col: str) -> DataFrame: ValueError: If all rows contain null values. """ total_rows = df.count() - df_clean = df.dropna(subset=[score_col]) + drop_cols = [score_col] + if weight_col is not None: + drop_cols.append(weight_col) + + df_clean = df.dropna(subset=drop_cols) rows_after_drop = df_clean.count() if rows_after_drop == 0: - raise ValueError(f"All rows contained null values in {score_col} column") + raise ValueError(f"All rows contained null values in {drop_cols} column(s)") dropped_rows = total_rows - rows_after_drop if dropped_rows > 0: logger.info( f"Dropped {dropped_rows}/{total_rows} rows ({(dropped_rows/total_rows)*100:.2f}%) " - f"with null values in column '{score_col}'" + f"with null values in column(s) '{', '.join(drop_cols)}'" ) return df_clean def _prepare_features( - self, df: DataFrame, score_col: str, label_col: str + self, + df: DataFrame, + score_col: str, + label_col: str, + weight_col: Optional[str] = None, ) -> DataFrame: """ Prepare features for logistic regression with all possible combinations. @@ -124,6 +147,7 @@ def _prepare_features( df (DataFrame): Input dataframe. score_col (str): Column containing raw model scores. label_col (str): Column containing binary labels. + weight_col (str, optional): Column containing sample weights. Returns: DataFrame: Transformed DataFrame with features ready for training. @@ -131,15 +155,21 @@ def _prepare_features( - features_both: Both log(score) and -log(1-score) - features_score: Only log(score) - features_complement: Only -log(1-score) + - weight: Sample weights (if weight_col is provided) """ log_score = self._log_expr(F.col(score_col)) log_one_minus_score = self._log_expr(1 - F.col(score_col)) - df_transformed = df.select( + select_cols = [ F.col(label_col).alias("label"), log_score.alias("log_score"), (-1 * log_one_minus_score).alias("log_score_complement"), - ) + ] + + if weight_col is not None: + select_cols.append(F.col(weight_col).alias("weight")) + + df_transformed = df.select(*select_cols) # Prepare all possible feature combinations assembler_both = VectorAssembler( @@ -156,35 +186,44 @@ def _prepare_features( df_with_score = assembler_score.transform(df_with_both) return assembler_complement.transform(df_with_score) - def _fit_logistic_regression(self, train_data: DataFrame) -> None: + def _fit_logistic_regression( + self, train_data: DataFrame, use_weights: bool = False + ) -> None: """ Fit logistic regression model and set coefficients. Args: train_data (DataFrame): Prepared training data with features. + use_weights (bool): Whether to use sample weights (weight column must be "weight"). """ lr = LogisticRegression() + if use_weights: + lr.setWeightCol("weight") # First try with both features - model = lr.fit( - train_data.select("label", F.col("features_both").alias("features")) - ) + select_cols = ["label", F.col("features_both").alias("features")] + if use_weights: + select_cols.append("weight") + + model = lr.fit(train_data.select(*select_cols)) coef = model.coefficients if coef[0] < 0: # Use only complement feature if first coefficient is negative - model = lr.fit( - train_data.select( - "label", F.col("features_complement").alias("features") - ) - ) + select_cols = ["label", F.col("features_complement").alias("features")] + if use_weights: + select_cols.append("weight") + + model = lr.fit(train_data.select(*select_cols)) self.a = 0.0 self.b = float(model.coefficients[0]) elif coef[1] < 0: # Use only score feature if second coefficient is negative - model = lr.fit( - train_data.select("label", F.col("features_score").alias("features")) - ) + select_cols = ["label", F.col("features_score").alias("features")] + if use_weights: + select_cols.append("weight") + + model = lr.fit(train_data.select(*select_cols)) self.a = float(model.coefficients[0]) self.b = 0.0 else: @@ -214,7 +253,11 @@ def _validate_score_range(self, df: DataFrame, score_col: str) -> None: ) def fit( - self, df: DataFrame, score_col: str = "score", label_col: str = "label" + self, + df: DataFrame, + score_col: str = "score", + label_col: str = "label", + weight_col: Optional[str] = None, ) -> "Betacal": """ Fit a beta calibration model using logistic regression. @@ -223,6 +266,8 @@ def fit( df (DataFrame): Input dataframe. score_col (str): Column containing raw model scores. label_col (str): Column containing binary labels. + weight_col (str, optional): Column containing sample weights for training. + If provided, each sample will be weighted during logistic regression fitting. Returns: Betacal: The fitted model instance (self). @@ -230,11 +275,15 @@ def fit( Raises: ValueError: If input DataFrame is empty or contains all null values. """ - self._validate_input_df(df, score_col, label_col) + self._validate_input_df(df, score_col, label_col, weight_col) self._validate_score_range(df, score_col) - df_clean = self._handle_null_values(df, score_col) - train_data = self._prepare_features(df_clean, score_col, label_col) - self._fit_logistic_regression(train_data) + df_clean = self._handle_null_values(df, score_col, weight_col) + + # Map weight_col to "weight" in prepared features + train_data = self._prepare_features(df_clean, score_col, label_col, weight_col) + + # Use weights if weight_col was provided (weight column is always named "weight" after preparation) + self._fit_logistic_regression(train_data, use_weights=(weight_col is not None)) return self def predict(