Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
.pre-commit-config.yaml
trufflehog/
101 changes: 75 additions & 26 deletions src/spark_calibration/betacal.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,21 @@ def _log_expr(self, col: F.Column) -> F.Column:
"""Numerically stable log transformation."""
return F.log(F.when(col < self.EPSILON, self.EPSILON).otherwise(col))

def _validate_input_df(self, df: DataFrame, score_col: str, label_col: str) -> None:
def _validate_input_df(
self,
df: DataFrame,
score_col: str,
label_col: str,
weight_col: Optional[str] = None,
) -> None:
"""
Validate input DataFrame and required columns.

Args:
df (DataFrame): Input dataframe.
score_col (str): Column containing raw model scores.
label_col (str): Column containing binary labels.
weight_col (str, optional): Column containing sample weights.

Raises:
ValueError: If DataFrame is empty or required columns are missing.
Expand All @@ -84,13 +91,21 @@ def _validate_input_df(self, df: DataFrame, score_col: str, label_col: str) -> N
score_col in df.columns and label_col in df.columns
), f"Columns {score_col} and {label_col} must be present."

def _handle_null_values(self, df: DataFrame, score_col: str) -> DataFrame:
if weight_col is not None:
assert (
weight_col in df.columns
), f"Column {weight_col} must be present when weight_col is provided."

def _handle_null_values(
self, df: DataFrame, score_col: str, weight_col: Optional[str] = None
) -> DataFrame:
"""
Handle null values in the score column.
Handle null values in the score column (and weight column if provided).

Args:
df (DataFrame): Input dataframe.
score_col (str): Column containing raw model scores.
weight_col (str, optional): Column containing sample weights.

Returns:
DataFrame: Cleaned DataFrame with null values removed.
Expand All @@ -99,23 +114,31 @@ def _handle_null_values(self, df: DataFrame, score_col: str) -> DataFrame:
ValueError: If all rows contain null values.
"""
total_rows = df.count()
df_clean = df.dropna(subset=[score_col])
drop_cols = [score_col]
if weight_col is not None:
drop_cols.append(weight_col)

df_clean = df.dropna(subset=drop_cols)
rows_after_drop = df_clean.count()

if rows_after_drop == 0:
raise ValueError(f"All rows contained null values in {score_col} column")
raise ValueError(f"All rows contained null values in {drop_cols} column(s)")

dropped_rows = total_rows - rows_after_drop
if dropped_rows > 0:
logger.info(
f"Dropped {dropped_rows}/{total_rows} rows ({(dropped_rows/total_rows)*100:.2f}%) "
f"with null values in column '{score_col}'"
f"with null values in column(s) '{', '.join(drop_cols)}'"
)

return df_clean

def _prepare_features(
self, df: DataFrame, score_col: str, label_col: str
self,
df: DataFrame,
score_col: str,
label_col: str,
weight_col: Optional[str] = None,
) -> DataFrame:
"""
Prepare features for logistic regression with all possible combinations.
Expand All @@ -124,22 +147,29 @@ def _prepare_features(
df (DataFrame): Input dataframe.
score_col (str): Column containing raw model scores.
label_col (str): Column containing binary labels.
weight_col (str, optional): Column containing sample weights.

Returns:
DataFrame: Transformed DataFrame with features ready for training.
Contains three feature vectors:
- features_both: Both log(score) and -log(1-score)
- features_score: Only log(score)
- features_complement: Only -log(1-score)
- weight: Sample weights (if weight_col is provided)
"""
log_score = self._log_expr(F.col(score_col))
log_one_minus_score = self._log_expr(1 - F.col(score_col))

df_transformed = df.select(
select_cols = [
F.col(label_col).alias("label"),
log_score.alias("log_score"),
(-1 * log_one_minus_score).alias("log_score_complement"),
)
]

if weight_col is not None:
select_cols.append(F.col(weight_col).alias("weight"))

df_transformed = df.select(*select_cols)

# Prepare all possible feature combinations
assembler_both = VectorAssembler(
Expand All @@ -156,35 +186,44 @@ def _prepare_features(
df_with_score = assembler_score.transform(df_with_both)
return assembler_complement.transform(df_with_score)

def _fit_logistic_regression(self, train_data: DataFrame) -> None:
def _fit_logistic_regression(
self, train_data: DataFrame, use_weights: bool = False
) -> None:
"""
Fit logistic regression model and set coefficients.

Args:
train_data (DataFrame): Prepared training data with features.
use_weights (bool): Whether to use sample weights (weight column must be "weight").
"""
lr = LogisticRegression()
if use_weights:
lr.setWeightCol("weight")

# First try with both features
model = lr.fit(
train_data.select("label", F.col("features_both").alias("features"))
)
select_cols = ["label", F.col("features_both").alias("features")]
if use_weights:
select_cols.append("weight")

model = lr.fit(train_data.select(*select_cols))
coef = model.coefficients

if coef[0] < 0:
# Use only complement feature if first coefficient is negative
model = lr.fit(
train_data.select(
"label", F.col("features_complement").alias("features")
)
)
select_cols = ["label", F.col("features_complement").alias("features")]
if use_weights:
select_cols.append("weight")

model = lr.fit(train_data.select(*select_cols))
self.a = 0.0
self.b = float(model.coefficients[0])
elif coef[1] < 0:
# Use only score feature if second coefficient is negative
model = lr.fit(
train_data.select("label", F.col("features_score").alias("features"))
)
select_cols = ["label", F.col("features_score").alias("features")]
if use_weights:
select_cols.append("weight")

model = lr.fit(train_data.select(*select_cols))
self.a = float(model.coefficients[0])
self.b = 0.0
else:
Expand Down Expand Up @@ -214,7 +253,11 @@ def _validate_score_range(self, df: DataFrame, score_col: str) -> None:
)

def fit(
self, df: DataFrame, score_col: str = "score", label_col: str = "label"
self,
df: DataFrame,
score_col: str = "score",
label_col: str = "label",
weight_col: Optional[str] = None,
) -> "Betacal":
"""
Fit a beta calibration model using logistic regression.
Expand All @@ -223,18 +266,24 @@ def fit(
df (DataFrame): Input dataframe.
score_col (str): Column containing raw model scores.
label_col (str): Column containing binary labels.
weight_col (str, optional): Column containing sample weights for training.
If provided, each sample will be weighted during logistic regression fitting.

Returns:
Betacal: The fitted model instance (self).

Raises:
ValueError: If input DataFrame is empty or contains all null values.
"""
self._validate_input_df(df, score_col, label_col)
self._validate_input_df(df, score_col, label_col, weight_col)
self._validate_score_range(df, score_col)
df_clean = self._handle_null_values(df, score_col)
train_data = self._prepare_features(df_clean, score_col, label_col)
self._fit_logistic_regression(train_data)
df_clean = self._handle_null_values(df, score_col, weight_col)

# Map weight_col to "weight" in prepared features
train_data = self._prepare_features(df_clean, score_col, label_col, weight_col)

# Use weights if weight_col was provided (weight column is always named "weight" after preparation)
self._fit_logistic_regression(train_data, use_weights=(weight_col is not None))
return self

def predict(
Expand Down