Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ dependencies = [
"openai-agents>=0.3.3",
"pip>=25.0.1",
"presidio-analyzer>=2.2.360",
"presidio-anonymizer>=2.2.360",
"thinc>=8.3.6",
]
classifiers = [
Expand Down
53 changes: 41 additions & 12 deletions src/guardrails/_base_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,8 +246,7 @@ def _apply_pii_masking_to_structured_content(
Returns:
Modified messages with PII masking applied to each text part
"""
from presidio_anonymizer import AnonymizerEngine
from presidio_anonymizer.entities import OperatorConfig
from guardrails.utils.anonymizer import OperatorConfig, anonymize

# Extract detected entity types and config
detected = pii_result.info.get("detected_entities", {})
Expand All @@ -256,18 +255,17 @@ def _apply_pii_masking_to_structured_content(

detect_encoded_pii = pii_result.info.get("detect_encoded_pii", False)

# Get Presidio engines - entity types are guaranteed valid from detection
# Get analyzer engine - entity types are guaranteed valid from detection
from .checks.text.pii import _get_analyzer_engine

analyzer = _get_analyzer_engine()
anonymizer = AnonymizerEngine()
entity_types = list(detected.keys())

# Create operators for each entity type
operators = {entity_type: OperatorConfig("replace", {"new_value": f"<{entity_type}>"}) for entity_type in entity_types}

def _mask_text(text: str) -> str:
"""Mask using Presidio's analyzer and anonymizer with Unicode normalization.
"""Mask using custom anonymizer with Unicode normalization.

Handles both plain and encoded PII consistently with main detection path.
"""
Expand Down Expand Up @@ -302,7 +300,7 @@ def _mask_text(text: str) -> str:
# Mask plain PII
masked = normalized
if has_plain_pii:
masked = anonymizer.anonymize(text=masked, analyzer_results=analyzer_results, operators=operators).text
masked = anonymize(text=masked, analyzer_results=analyzer_results, operators=operators).text

# Mask encoded PII if found
if has_encoded_pii:
Expand All @@ -311,19 +309,50 @@ def _mask_text(text: str) -> str:
decoded_results = analyzer.analyze(decoded_text_for_masking, entities=entity_types, language="en")

if decoded_results:
# Map detections back to mask encoded chunks
# Build list of (candidate, entity_type) pairs to mask
candidates_to_mask = []

for result in decoded_results:
detected_value = decoded_text_for_masking[result.start : result.end]
entity_type = result.entity_type

# Find candidate that contains this PII
# Find candidate that overlaps with this PII
# Use comprehensive overlap logic matching pii.py implementation
for candidate in candidates_for_masking:
if detected_value in candidate.decoded_text:
# Mask the encoded version
entity_marker = f"<{entity_type}_ENCODED>"
masked = masked[: candidate.start] + entity_marker + masked[candidate.end :]
if not candidate.decoded_text:
continue

candidate_lower = candidate.decoded_text.lower()
detected_lower = detected_value.lower()

# Check if candidate's decoded text overlaps with the detection
# Handle partial encodings where encoded span may include extra characters
# e.g., %3A%6a%6f%65%40 → ":joe@" but only "joe@" is in email "joe@domain.com"
has_overlap = (
candidate_lower in detected_lower # Candidate is substring of detection
or detected_lower in candidate_lower # Detection is substring of candidate
or (
len(candidate_lower) >= 3
and any( # Any 3-char chunk overlaps
candidate_lower[i : i + 3] in detected_lower
for i in range(0, len(candidate_lower) - 2, 2) # Step by 2 for efficiency
)
)
)

if has_overlap:
candidates_to_mask.append((candidate, entity_type))
break

# Sort by position (reverse) to mask from end to start
# This preserves position validity for subsequent replacements
candidates_to_mask.sort(key=lambda x: x[0].start, reverse=True)

# Mask from end to start
for candidate, entity_type in candidates_to_mask:
entity_marker = f"<{entity_type}_ENCODED>"
masked = masked[: candidate.start] + entity_marker + masked[candidate.end :]

return masked

# Mask each text part
Expand Down
79 changes: 49 additions & 30 deletions src/guardrails/checks/text/pii.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,12 @@
from presidio_analyzer.predefined_recognizers.country_specific.korea.kr_rrn_recognizer import (
KrRrnRecognizer,
)
from presidio_anonymizer import AnonymizerEngine
from presidio_anonymizer.entities import OperatorConfig
from pydantic import BaseModel, ConfigDict, Field

from guardrails.registry import default_spec_registry
from guardrails.spec import GuardrailSpecMetadata
from guardrails.types import GuardrailResult
from guardrails.utils.anonymizer import OperatorConfig, anonymize

__all__ = ["pii"]

Expand Down Expand Up @@ -155,15 +154,53 @@ def _get_analyzer_engine() -> AnalyzerEngine:
)

# BIC/SWIFT code recognizer (8 or 11 characters: 4 bank + 2 country + 2 location + 3 branch)
bic_pattern = Pattern(
name="bic_swift_pattern",
regex=r"\b[A-Z]{4}[A-Z]{2}[A-Z0-9]{2}([A-Z0-9]{3})?\b",
score=0.75,
# Uses context-aware pattern to reduce false positives on common words like "CUSTOMER"
# Requires either:
# 1. Explicit prefix (SWIFT:, BIC:, Bank Code:, etc.) OR
# 2. Known bank code from major financial institutions
# This significantly reduces false positives while maintaining high recall for actual BIC codes

# Pattern 1: Explicit context with common BIC/SWIFT prefixes (high confidence)
# Case-insensitive for the context words, but code itself must be uppercase
bic_with_context_pattern = Pattern(
name="bic_with_context",
regex=r"(?i)(?:swift|bic|bank[\s-]?code|swift[\s-]?code|bic[\s-]?code)(?-i)[:\s=]+([A-Z]{4}[A-Z]{2}[A-Z0-9]{2}(?:[A-Z0-9]{3})?)\b",
score=0.95,
)

# Pattern 2: Known banking institutions (4-letter bank codes from major banks)
# This whitelist approach has very low false positive rate
# Only detects codes starting with known bank identifiers
known_bank_codes = (
"DEUT|CHAS|BARC|HSBC|BNPA|CITI|WELL|BOFA|JPMC|GSCC|MSNY|" # Major international
"COBA|DRSD|BYLADEM|MALADE|HYVEDEMM|" # Germany
"WFBI|USBC|" # US
"LOYD|MIDL|NWBK|RBOS|" # UK
"CRLY|SOGEFRPP|AGRIFRPP|" # France
"UBSW|CRESCHZZ|" # Switzerland
"SANB|BBVA|" # Spain
"UNCRITMM|BCITITMMXXX|" # Italy
"INGB|ABNA|RABO|" # Netherlands
"ROYA|TDOM|BNSC|" # Canada
"ANZB|NATA|WPAC|CTBA|" # Australia
"BKCHJPJT|MHCBJPJT|BOTKJPJT|" # Japan
"ICBKCNBJ|BKCHCNBJ|ABOCCNBJ|PCBCCNBJ|" # China
"HSBCHKHH|SCBLHKHH|" # Hong Kong
"DBSSSGSG|OCBCSGSG|UOVBSGSG|" # Singapore
"CZNB|SHBK|KOEX|HVBK|NACF|IBKO|KODB|HNBN|CITIKRSX" # South Korea
)

known_bic_pattern = Pattern(
name="known_bic_codes",
regex=rf"\b(?:{known_bank_codes})[A-Z]{{2}}[A-Z0-9]{{2}}(?:[A-Z0-9]{{3}})?\b",
score=0.90,
)

# Register both patterns
registry.add_recognizer(
PatternRecognizer(
supported_entity="BIC_SWIFT",
patterns=[bic_pattern],
patterns=[bic_with_context_pattern, known_bic_pattern],
supported_language="en",
)
)
Expand Down Expand Up @@ -192,19 +229,6 @@ def _get_analyzer_engine() -> AnalyzerEngine:
return engine


@functools.lru_cache(maxsize=1)
def _get_anonymizer_engine() -> AnonymizerEngine:
"""Return a cached AnonymizerEngine for PII masking.

Uses Presidio's built-in anonymization for optimal performance and
correct handling of overlapping entities, Unicode, and special characters.

Returns:
AnonymizerEngine: Configured anonymizer for replacing PII entities.
"""
return AnonymizerEngine()


class PIIEntity(str, Enum):
"""Supported PII entity types for detection.

Expand Down Expand Up @@ -460,9 +484,7 @@ def _try_decode_base64(text: str) -> str | None:
decoded_bytes = base64.b64decode(text, validate=True)
# Security: Fail closed - reject content > 10KB to prevent memory DoS and PII bypass
if len(decoded_bytes) > 10_000:
msg = (
f"Base64 decoded content too large ({len(decoded_bytes):,} bytes). Maximum allowed is 10KB."
)
msg = f"Base64 decoded content too large ({len(decoded_bytes):,} bytes). Maximum allowed is 10KB."
raise ValueError(msg)
# Check if result is valid UTF-8
return decoded_bytes.decode("utf-8", errors="strict")
Expand Down Expand Up @@ -590,11 +612,10 @@ def _build_decoded_text(text: str) -> tuple[str, list[EncodedCandidate]]:


def _mask_pii(text: str, detection: PiiDetectionResult, config: PIIConfig) -> tuple[str, dict[str, list[str]]]:
"""Mask detected PII using Presidio's AnonymizerEngine.
"""Mask detected PII using custom anonymizer.

Normalizes Unicode before masking to ensure consistency with detection.
Uses Presidio's built-in anonymization for optimal performance and
correct handling of overlapping entities, Unicode, and special characters.
Handles overlapping entities, Unicode, and special characters correctly.

If detect_encoded_pii is enabled, also detects and masks PII in
Base64/URL-encoded/hex strings using a hybrid approach.
Expand Down Expand Up @@ -627,13 +648,11 @@ def _mask_pii(text: str, detection: PiiDetectionResult, config: PIIConfig) -> tu
# No PII detected - return original text to preserve special characters
return text, {}

# Use Presidio's optimized anonymizer with replace operator
anonymizer = _get_anonymizer_engine()

# Create operators mapping each entity type to a replace operator
operators = {entity_type: OperatorConfig("replace", {"new_value": f"<{entity_type}>"}) for entity_type in detection.mapping.keys()}

result = anonymizer.anonymize(
# Use custom anonymizer
result = anonymize(
text=normalized_text,
analyzer_results=detection.analyzer_results,
operators=operators,
Expand Down
148 changes: 148 additions & 0 deletions src/guardrails/utils/anonymizer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
"""Custom anonymizer for PII masking.

This module provides a lightweight replacement for presidio-anonymizer,
implementing text masking functionality for detected PII entities.
"""

from __future__ import annotations

from collections.abc import Sequence
from dataclasses import dataclass
from typing import Any, Protocol


class RecognizerResult(Protocol):
"""Protocol for analyzer results from presidio-analyzer.

Attributes:
start: Start position of the entity in text.
end: End position of the entity in text.
entity_type: Type of the detected entity (e.g., "EMAIL_ADDRESS").
"""

start: int
end: int
entity_type: str


@dataclass(frozen=True, slots=True)
class OperatorConfig:
"""Configuration for an anonymization operator.

Args:
operator_name: Name of the operator (e.g., "replace").
params: Parameters for the operator (e.g., {"new_value": "<EMAIL>"}).
"""

operator_name: str
params: dict[str, Any]


@dataclass(frozen=True, slots=True)
class AnonymizeResult:
"""Result of text anonymization.

Attributes:
text: The anonymized text with entities masked.
"""

text: str


def _resolve_overlaps(results: Sequence[RecognizerResult]) -> list[RecognizerResult]:
"""Remove overlapping entity spans, keeping longer/earlier ones.

When entities overlap, prioritize:
1. Longer spans over shorter ones
2. Earlier positions when spans are equal length

Args:
results: Sequence of recognizer results to resolve.

Returns:
List of non-overlapping recognizer results.

Examples:
>>> # If EMAIL_ADDRESS spans (0, 20) and PERSON spans (5, 10), keep EMAIL_ADDRESS
>>> # If two entities span (0, 10) and (5, 15), keep the one starting at 0
"""
if not results:
return []

# Sort by: 1) longer spans first, 2) earlier position for equal lengths
sorted_results = sorted(
results,
key=lambda r: (-(r.end - r.start), r.start),
)

# Filter out overlapping spans
non_overlapping: list[RecognizerResult] = []
for result in sorted_results:
# Check if this result overlaps with any already selected
overlaps = False
for selected in non_overlapping:
# Two spans overlap if one starts before the other ends
if (result.start < selected.end and result.end > selected.start):
overlaps = True
break

if not overlaps:
non_overlapping.append(result)

return non_overlapping


def anonymize(
text: str,
analyzer_results: Sequence[RecognizerResult],
operators: dict[str, OperatorConfig],
) -> AnonymizeResult:
"""Anonymize text by replacing detected entities with placeholders.

This function replicates presidio-anonymizer's behavior for the "replace"
operator, which we use to mask PII with placeholders like "<EMAIL_ADDRESS>".

Args:
text: The original text to anonymize.
analyzer_results: Sequence of detected entities with positions.
operators: Mapping from entity type to operator configuration.

Returns:
AnonymizeResult with masked text.

Examples:
>>> from collections import namedtuple
>>> Result = namedtuple("Result", ["start", "end", "entity_type"])
>>> results = [Result(start=10, end=25, entity_type="EMAIL_ADDRESS")]
>>> operators = {"EMAIL_ADDRESS": OperatorConfig("replace", {"new_value": "<EMAIL_ADDRESS>"})}
>>> result = anonymize("Contact: john@example.com", results, operators)
>>> result.text
'Contact: <EMAIL_ADDRESS>'
"""
if not analyzer_results or not text:
return AnonymizeResult(text=text)

# Resolve overlapping entities
non_overlapping = _resolve_overlaps(analyzer_results)

# Sort by position (reverse order) to maintain correct offsets during replacement
sorted_results = sorted(non_overlapping, key=lambda r: r.start, reverse=True)

# Replace entities from end to start
masked_text = text
for result in sorted_results:
entity_type = result.entity_type
operator_config = operators.get(entity_type)

if operator_config and operator_config.operator_name == "replace":
# Extract the replacement value
new_value = operator_config.params.get("new_value", f"<{entity_type}>")
# Replace the text span
masked_text = (
masked_text[: result.start]
+ new_value
+ masked_text[result.end :]
)

return AnonymizeResult(text=masked_text)

Loading
Loading