From a6bea2af86ebff252f57faca3eccc8997f5bfbe4 Mon Sep 17 00:00:00 2001 From: rahuld-meesho Date: Tue, 4 Nov 2025 20:27:52 +0530 Subject: [PATCH 1/3] Add weight_col parameter to Betacal.fit() for weighted training --- src/spark_calibration/betacal.py | 90 +++++++++++++++++++++++--------- 1 file changed, 64 insertions(+), 26 deletions(-) diff --git a/src/spark_calibration/betacal.py b/src/spark_calibration/betacal.py index ffb5e34..0ff33e5 100644 --- a/src/spark_calibration/betacal.py +++ b/src/spark_calibration/betacal.py @@ -65,7 +65,9 @@ def _log_expr(self, col: F.Column) -> F.Column: """Numerically stable log transformation.""" return F.log(F.when(col < self.EPSILON, self.EPSILON).otherwise(col)) - def _validate_input_df(self, df: DataFrame, score_col: str, label_col: str) -> None: + def _validate_input_df( + self, df: DataFrame, score_col: str, label_col: str, weight_col: Optional[str] = None + ) -> None: """ Validate input DataFrame and required columns. @@ -73,6 +75,7 @@ def _validate_input_df(self, df: DataFrame, score_col: str, label_col: str) -> N df (DataFrame): Input dataframe. score_col (str): Column containing raw model scores. label_col (str): Column containing binary labels. + weight_col (str, optional): Column containing sample weights. Raises: ValueError: If DataFrame is empty or required columns are missing. @@ -83,14 +86,20 @@ def _validate_input_df(self, df: DataFrame, score_col: str, label_col: str) -> N assert ( score_col in df.columns and label_col in df.columns ), f"Columns {score_col} and {label_col} must be present." + + if weight_col is not None: + assert weight_col in df.columns, f"Column {weight_col} must be present when weight_col is provided." - def _handle_null_values(self, df: DataFrame, score_col: str) -> DataFrame: + def _handle_null_values( + self, df: DataFrame, score_col: str, weight_col: Optional[str] = None + ) -> DataFrame: """ - Handle null values in the score column. + Handle null values in the score column (and weight column if provided). Args: df (DataFrame): Input dataframe. score_col (str): Column containing raw model scores. + weight_col (str, optional): Column containing sample weights. Returns: DataFrame: Cleaned DataFrame with null values removed. @@ -99,23 +108,27 @@ def _handle_null_values(self, df: DataFrame, score_col: str) -> DataFrame: ValueError: If all rows contain null values. """ total_rows = df.count() - df_clean = df.dropna(subset=[score_col]) + drop_cols = [score_col] + if weight_col is not None: + drop_cols.append(weight_col) + + df_clean = df.dropna(subset=drop_cols) rows_after_drop = df_clean.count() if rows_after_drop == 0: - raise ValueError(f"All rows contained null values in {score_col} column") + raise ValueError(f"All rows contained null values in {drop_cols} column(s)") dropped_rows = total_rows - rows_after_drop if dropped_rows > 0: logger.info( f"Dropped {dropped_rows}/{total_rows} rows ({(dropped_rows/total_rows)*100:.2f}%) " - f"with null values in column '{score_col}'" + f"with null values in column(s) '{', '.join(drop_cols)}'" ) return df_clean def _prepare_features( - self, df: DataFrame, score_col: str, label_col: str + self, df: DataFrame, score_col: str, label_col: str, weight_col: Optional[str] = None ) -> DataFrame: """ Prepare features for logistic regression with all possible combinations. @@ -124,6 +137,7 @@ def _prepare_features( df (DataFrame): Input dataframe. score_col (str): Column containing raw model scores. label_col (str): Column containing binary labels. + weight_col (str, optional): Column containing sample weights. Returns: DataFrame: Transformed DataFrame with features ready for training. @@ -131,15 +145,21 @@ def _prepare_features( - features_both: Both log(score) and -log(1-score) - features_score: Only log(score) - features_complement: Only -log(1-score) + - weight: Sample weights (if weight_col is provided) """ log_score = self._log_expr(F.col(score_col)) log_one_minus_score = self._log_expr(1 - F.col(score_col)) - df_transformed = df.select( + select_cols = [ F.col(label_col).alias("label"), log_score.alias("log_score"), (-1 * log_one_minus_score).alias("log_score_complement"), - ) + ] + + if weight_col is not None: + select_cols.append(F.col(weight_col).alias("weight")) + + df_transformed = df.select(*select_cols) # Prepare all possible feature combinations assembler_both = VectorAssembler( @@ -156,35 +176,42 @@ def _prepare_features( df_with_score = assembler_score.transform(df_with_both) return assembler_complement.transform(df_with_score) - def _fit_logistic_regression(self, train_data: DataFrame) -> None: + def _fit_logistic_regression(self, train_data: DataFrame, weight_col: Optional[str] = None) -> None: """ Fit logistic regression model and set coefficients. Args: train_data (DataFrame): Prepared training data with features. + weight_col (str, optional): Column name for sample weights in train_data. """ lr = LogisticRegression() + if weight_col is not None: + lr.setWeightCol(weight_col) # First try with both features - model = lr.fit( - train_data.select("label", F.col("features_both").alias("features")) - ) + select_cols = ["label", F.col("features_both").alias("features")] + if weight_col is not None: + select_cols.append(weight_col) + + model = lr.fit(train_data.select(*select_cols)) coef = model.coefficients if coef[0] < 0: # Use only complement feature if first coefficient is negative - model = lr.fit( - train_data.select( - "label", F.col("features_complement").alias("features") - ) - ) + select_cols = ["label", F.col("features_complement").alias("features")] + if weight_col is not None: + select_cols.append(weight_col) + + model = lr.fit(train_data.select(*select_cols)) self.a = 0.0 self.b = float(model.coefficients[0]) elif coef[1] < 0: # Use only score feature if second coefficient is negative - model = lr.fit( - train_data.select("label", F.col("features_score").alias("features")) - ) + select_cols = ["label", F.col("features_score").alias("features")] + if weight_col is not None: + select_cols.append(weight_col) + + model = lr.fit(train_data.select(*select_cols)) self.a = float(model.coefficients[0]) self.b = 0.0 else: @@ -214,7 +241,11 @@ def _validate_score_range(self, df: DataFrame, score_col: str) -> None: ) def fit( - self, df: DataFrame, score_col: str = "score", label_col: str = "label" + self, + df: DataFrame, + score_col: str = "score", + label_col: str = "label", + weight_col: Optional[str] = None, ) -> "Betacal": """ Fit a beta calibration model using logistic regression. @@ -223,6 +254,8 @@ def fit( df (DataFrame): Input dataframe. score_col (str): Column containing raw model scores. label_col (str): Column containing binary labels. + weight_col (str, optional): Column containing sample weights for training. + If provided, each sample will be weighted during logistic regression fitting. Returns: Betacal: The fitted model instance (self). @@ -230,11 +263,16 @@ def fit( Raises: ValueError: If input DataFrame is empty or contains all null values. """ - self._validate_input_df(df, score_col, label_col) + self._validate_input_df(df, score_col, label_col, weight_col) self._validate_score_range(df, score_col) - df_clean = self._handle_null_values(df, score_col) - train_data = self._prepare_features(df_clean, score_col, label_col) - self._fit_logistic_regression(train_data) + df_clean = self._handle_null_values(df, score_col, weight_col) + + # Map weight_col to "weight" in prepared features + train_data = self._prepare_features(df_clean, score_col, label_col, weight_col) + + # Use "weight" as the weight column name in train_data (if weights were provided) + weight_col_name = "weight" if weight_col is not None else None + self._fit_logistic_regression(train_data, weight_col_name) return self def predict( From 9d82c885acc43ed11401772db8a4451142df2b29 Mon Sep 17 00:00:00 2001 From: rahuld-meesho Date: Tue, 4 Nov 2025 20:32:10 +0530 Subject: [PATCH 2/3] Simplify weight column handling to always use 'weight' internally --- src/spark_calibration/betacal.py | 25 ++++++++++++------------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/src/spark_calibration/betacal.py b/src/spark_calibration/betacal.py index 0ff33e5..65626ec 100644 --- a/src/spark_calibration/betacal.py +++ b/src/spark_calibration/betacal.py @@ -176,22 +176,22 @@ def _prepare_features( df_with_score = assembler_score.transform(df_with_both) return assembler_complement.transform(df_with_score) - def _fit_logistic_regression(self, train_data: DataFrame, weight_col: Optional[str] = None) -> None: + def _fit_logistic_regression(self, train_data: DataFrame, use_weights: bool = False) -> None: """ Fit logistic regression model and set coefficients. Args: train_data (DataFrame): Prepared training data with features. - weight_col (str, optional): Column name for sample weights in train_data. + use_weights (bool): Whether to use sample weights (weight column must be "weight"). """ lr = LogisticRegression() - if weight_col is not None: - lr.setWeightCol(weight_col) + if use_weights: + lr.setWeightCol("weight") # First try with both features select_cols = ["label", F.col("features_both").alias("features")] - if weight_col is not None: - select_cols.append(weight_col) + if use_weights: + select_cols.append("weight") model = lr.fit(train_data.select(*select_cols)) coef = model.coefficients @@ -199,8 +199,8 @@ def _fit_logistic_regression(self, train_data: DataFrame, weight_col: Optional[s if coef[0] < 0: # Use only complement feature if first coefficient is negative select_cols = ["label", F.col("features_complement").alias("features")] - if weight_col is not None: - select_cols.append(weight_col) + if use_weights: + select_cols.append("weight") model = lr.fit(train_data.select(*select_cols)) self.a = 0.0 @@ -208,8 +208,8 @@ def _fit_logistic_regression(self, train_data: DataFrame, weight_col: Optional[s elif coef[1] < 0: # Use only score feature if second coefficient is negative select_cols = ["label", F.col("features_score").alias("features")] - if weight_col is not None: - select_cols.append(weight_col) + if use_weights: + select_cols.append("weight") model = lr.fit(train_data.select(*select_cols)) self.a = float(model.coefficients[0]) @@ -270,9 +270,8 @@ def fit( # Map weight_col to "weight" in prepared features train_data = self._prepare_features(df_clean, score_col, label_col, weight_col) - # Use "weight" as the weight column name in train_data (if weights were provided) - weight_col_name = "weight" if weight_col is not None else None - self._fit_logistic_regression(train_data, weight_col_name) + # Use weights if weight_col was provided (weight column is always named "weight" after preparation) + self._fit_logistic_regression(train_data, use_weights=(weight_col is not None)) return self def predict( From de4a33d60301fe126af2f08d5ab8eebae196e301 Mon Sep 17 00:00:00 2001 From: rahuld-meesho Date: Tue, 4 Nov 2025 20:36:12 +0530 Subject: [PATCH 3/3] black formatting --- .gitignore | 1 + src/spark_calibration/betacal.py | 36 +++++++++++++++++++++----------- 2 files changed, 25 insertions(+), 12 deletions(-) diff --git a/.gitignore b/.gitignore index e90b739..3d624ce 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ .pre-commit-config.yaml +trufflehog/ diff --git a/src/spark_calibration/betacal.py b/src/spark_calibration/betacal.py index 65626ec..471ed24 100644 --- a/src/spark_calibration/betacal.py +++ b/src/spark_calibration/betacal.py @@ -66,7 +66,11 @@ def _log_expr(self, col: F.Column) -> F.Column: return F.log(F.when(col < self.EPSILON, self.EPSILON).otherwise(col)) def _validate_input_df( - self, df: DataFrame, score_col: str, label_col: str, weight_col: Optional[str] = None + self, + df: DataFrame, + score_col: str, + label_col: str, + weight_col: Optional[str] = None, ) -> None: """ Validate input DataFrame and required columns. @@ -86,9 +90,11 @@ def _validate_input_df( assert ( score_col in df.columns and label_col in df.columns ), f"Columns {score_col} and {label_col} must be present." - + if weight_col is not None: - assert weight_col in df.columns, f"Column {weight_col} must be present when weight_col is provided." + assert ( + weight_col in df.columns + ), f"Column {weight_col} must be present when weight_col is provided." def _handle_null_values( self, df: DataFrame, score_col: str, weight_col: Optional[str] = None @@ -111,7 +117,7 @@ def _handle_null_values( drop_cols = [score_col] if weight_col is not None: drop_cols.append(weight_col) - + df_clean = df.dropna(subset=drop_cols) rows_after_drop = df_clean.count() @@ -128,7 +134,11 @@ def _handle_null_values( return df_clean def _prepare_features( - self, df: DataFrame, score_col: str, label_col: str, weight_col: Optional[str] = None + self, + df: DataFrame, + score_col: str, + label_col: str, + weight_col: Optional[str] = None, ) -> DataFrame: """ Prepare features for logistic regression with all possible combinations. @@ -155,7 +165,7 @@ def _prepare_features( log_score.alias("log_score"), (-1 * log_one_minus_score).alias("log_score_complement"), ] - + if weight_col is not None: select_cols.append(F.col(weight_col).alias("weight")) @@ -176,7 +186,9 @@ def _prepare_features( df_with_score = assembler_score.transform(df_with_both) return assembler_complement.transform(df_with_score) - def _fit_logistic_regression(self, train_data: DataFrame, use_weights: bool = False) -> None: + def _fit_logistic_regression( + self, train_data: DataFrame, use_weights: bool = False + ) -> None: """ Fit logistic regression model and set coefficients. @@ -192,7 +204,7 @@ def _fit_logistic_regression(self, train_data: DataFrame, use_weights: bool = Fa select_cols = ["label", F.col("features_both").alias("features")] if use_weights: select_cols.append("weight") - + model = lr.fit(train_data.select(*select_cols)) coef = model.coefficients @@ -201,7 +213,7 @@ def _fit_logistic_regression(self, train_data: DataFrame, use_weights: bool = Fa select_cols = ["label", F.col("features_complement").alias("features")] if use_weights: select_cols.append("weight") - + model = lr.fit(train_data.select(*select_cols)) self.a = 0.0 self.b = float(model.coefficients[0]) @@ -210,7 +222,7 @@ def _fit_logistic_regression(self, train_data: DataFrame, use_weights: bool = Fa select_cols = ["label", F.col("features_score").alias("features")] if use_weights: select_cols.append("weight") - + model = lr.fit(train_data.select(*select_cols)) self.a = float(model.coefficients[0]) self.b = 0.0 @@ -266,10 +278,10 @@ def fit( self._validate_input_df(df, score_col, label_col, weight_col) self._validate_score_range(df, score_col) df_clean = self._handle_null_values(df, score_col, weight_col) - + # Map weight_col to "weight" in prepared features train_data = self._prepare_features(df_clean, score_col, label_col, weight_col) - + # Use weights if weight_col was provided (weight column is always named "weight" after preparation) self._fit_logistic_regression(train_data, use_weights=(weight_col is not None)) return self