From 2b0d899bd6aeaba7f10485acdd7a5e6d4feeeb77 Mon Sep 17 00:00:00 2001 From: Steven C Date: Mon, 10 Nov 2025 14:44:52 -0500 Subject: [PATCH 1/5] Remove dependency on Presidio Anonymizer --- pyproject.toml | 1 - src/guardrails/_base_client.py | 10 +- src/guardrails/checks/text/pii.py | 79 ++++--- src/guardrails/utils/anonymizer.py | 148 +++++++++++++ tests/unit/checks/test_anonymizer_baseline.py | 203 ++++++++++++++++++ tests/unit/checks/test_pii.py | 61 ++++++ 6 files changed, 465 insertions(+), 37 deletions(-) create mode 100644 src/guardrails/utils/anonymizer.py create mode 100644 tests/unit/checks/test_anonymizer_baseline.py diff --git a/pyproject.toml b/pyproject.toml index c5b10e9..9b5aa20 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -12,7 +12,6 @@ dependencies = [ "openai-agents>=0.3.3", "pip>=25.0.1", "presidio-analyzer>=2.2.360", - "presidio-anonymizer>=2.2.360", "thinc>=8.3.6", ] classifiers = [ diff --git a/src/guardrails/_base_client.py b/src/guardrails/_base_client.py index c3894a5..811e669 100644 --- a/src/guardrails/_base_client.py +++ b/src/guardrails/_base_client.py @@ -246,8 +246,7 @@ def _apply_pii_masking_to_structured_content( Returns: Modified messages with PII masking applied to each text part """ - from presidio_anonymizer import AnonymizerEngine - from presidio_anonymizer.entities import OperatorConfig + from guardrails.utils.anonymizer import OperatorConfig, anonymize # Extract detected entity types and config detected = pii_result.info.get("detected_entities", {}) @@ -256,18 +255,17 @@ def _apply_pii_masking_to_structured_content( detect_encoded_pii = pii_result.info.get("detect_encoded_pii", False) - # Get Presidio engines - entity types are guaranteed valid from detection + # Get analyzer engine - entity types are guaranteed valid from detection from .checks.text.pii import _get_analyzer_engine analyzer = _get_analyzer_engine() - anonymizer = AnonymizerEngine() entity_types = list(detected.keys()) # Create operators for each entity type operators = {entity_type: OperatorConfig("replace", {"new_value": f"<{entity_type}>"}) for entity_type in entity_types} def _mask_text(text: str) -> str: - """Mask using Presidio's analyzer and anonymizer with Unicode normalization. + """Mask using custom anonymizer with Unicode normalization. Handles both plain and encoded PII consistently with main detection path. """ @@ -302,7 +300,7 @@ def _mask_text(text: str) -> str: # Mask plain PII masked = normalized if has_plain_pii: - masked = anonymizer.anonymize(text=masked, analyzer_results=analyzer_results, operators=operators).text + masked = anonymize(text=masked, analyzer_results=analyzer_results, operators=operators).text # Mask encoded PII if found if has_encoded_pii: diff --git a/src/guardrails/checks/text/pii.py b/src/guardrails/checks/text/pii.py index d2ec90f..aecb309 100644 --- a/src/guardrails/checks/text/pii.py +++ b/src/guardrails/checks/text/pii.py @@ -89,13 +89,12 @@ from presidio_analyzer.predefined_recognizers.country_specific.korea.kr_rrn_recognizer import ( KrRrnRecognizer, ) -from presidio_anonymizer import AnonymizerEngine -from presidio_anonymizer.entities import OperatorConfig from pydantic import BaseModel, ConfigDict, Field from guardrails.registry import default_spec_registry from guardrails.spec import GuardrailSpecMetadata from guardrails.types import GuardrailResult +from guardrails.utils.anonymizer import OperatorConfig, anonymize __all__ = ["pii"] @@ -155,15 +154,53 @@ def _get_analyzer_engine() -> AnalyzerEngine: ) # BIC/SWIFT code recognizer (8 or 11 characters: 4 bank + 2 country + 2 location + 3 branch) - bic_pattern = Pattern( - name="bic_swift_pattern", - regex=r"\b[A-Z]{4}[A-Z]{2}[A-Z0-9]{2}([A-Z0-9]{3})?\b", - score=0.75, + # Uses context-aware pattern to reduce false positives on common words like "CUSTOMER" + # Requires either: + # 1. Explicit prefix (SWIFT:, BIC:, Bank Code:, etc.) OR + # 2. Known bank code from major financial institutions + # This significantly reduces false positives while maintaining high recall for actual BIC codes + + # Pattern 1: Explicit context with common BIC/SWIFT prefixes (high confidence) + # Case-insensitive for the context words, but code itself must be uppercase + bic_with_context_pattern = Pattern( + name="bic_with_context", + regex=r"(?i)(?:swift|bic|bank[\s-]?code|swift[\s-]?code|bic[\s-]?code)(?-i)[:\s=]+([A-Z]{4}[A-Z]{2}[A-Z0-9]{2}(?:[A-Z0-9]{3})?)\b", + score=0.95, ) + + # Pattern 2: Known banking institutions (4-letter bank codes from major banks) + # This whitelist approach has very low false positive rate + # Only detects codes starting with known bank identifiers + known_bank_codes = ( + "DEUT|CHAS|BARC|HSBC|BNPA|CITI|WELL|BOFA|JPMC|GSCC|MSNY|" # Major international + "COBA|DRSD|BYLADEM|MALADE|HYVEDEMM|" # Germany + "WFBI|USBC|" # US + "LOYD|MIDL|NWBK|RBOS|" # UK + "CRLY|SOGEFRPP|AGRIFRPP|" # France + "UBSW|CRESCHZZ|" # Switzerland + "SANB|BBVA|" # Spain + "UNCRITMM|BCITITMMXXX|" # Italy + "INGB|ABNA|RABO|" # Netherlands + "ROYA|TDOM|BNSC|" # Canada + "ANZB|NATA|WPAC|CTBA|" # Australia + "BKCHJPJT|MHCBJPJT|BOTKJPJT|" # Japan + "ICBKCNBJ|BKCHCNBJ|ABOCCNBJ|PCBCCNBJ|" # China + "HSBCHKHH|SCBLHKHH|" # Hong Kong + "DBSSSGSG|OCBCSGSG|UOVBSGSG|" # Singapore + "CZNB|SHBK|KOEX|HVBK|NACF|IBKO|KODB|HNBN|CITIKRSX" # South Korea + ) + + known_bic_pattern = Pattern( + name="known_bic_codes", + regex=rf"\b(?:{known_bank_codes})[A-Z]{{2}}[A-Z0-9]{{2}}(?:[A-Z0-9]{{3}})?\b", + score=0.90, + ) + + # Register both patterns registry.add_recognizer( PatternRecognizer( supported_entity="BIC_SWIFT", - patterns=[bic_pattern], + patterns=[bic_with_context_pattern, known_bic_pattern], supported_language="en", ) ) @@ -192,19 +229,6 @@ def _get_analyzer_engine() -> AnalyzerEngine: return engine -@functools.lru_cache(maxsize=1) -def _get_anonymizer_engine() -> AnonymizerEngine: - """Return a cached AnonymizerEngine for PII masking. - - Uses Presidio's built-in anonymization for optimal performance and - correct handling of overlapping entities, Unicode, and special characters. - - Returns: - AnonymizerEngine: Configured anonymizer for replacing PII entities. - """ - return AnonymizerEngine() - - class PIIEntity(str, Enum): """Supported PII entity types for detection. @@ -460,9 +484,7 @@ def _try_decode_base64(text: str) -> str | None: decoded_bytes = base64.b64decode(text, validate=True) # Security: Fail closed - reject content > 10KB to prevent memory DoS and PII bypass if len(decoded_bytes) > 10_000: - msg = ( - f"Base64 decoded content too large ({len(decoded_bytes):,} bytes). Maximum allowed is 10KB." - ) + msg = f"Base64 decoded content too large ({len(decoded_bytes):,} bytes). Maximum allowed is 10KB." raise ValueError(msg) # Check if result is valid UTF-8 return decoded_bytes.decode("utf-8", errors="strict") @@ -590,11 +612,10 @@ def _build_decoded_text(text: str) -> tuple[str, list[EncodedCandidate]]: def _mask_pii(text: str, detection: PiiDetectionResult, config: PIIConfig) -> tuple[str, dict[str, list[str]]]: - """Mask detected PII using Presidio's AnonymizerEngine. + """Mask detected PII using custom anonymizer. Normalizes Unicode before masking to ensure consistency with detection. - Uses Presidio's built-in anonymization for optimal performance and - correct handling of overlapping entities, Unicode, and special characters. + Handles overlapping entities, Unicode, and special characters correctly. If detect_encoded_pii is enabled, also detects and masks PII in Base64/URL-encoded/hex strings using a hybrid approach. @@ -627,13 +648,11 @@ def _mask_pii(text: str, detection: PiiDetectionResult, config: PIIConfig) -> tu # No PII detected - return original text to preserve special characters return text, {} - # Use Presidio's optimized anonymizer with replace operator - anonymizer = _get_anonymizer_engine() - # Create operators mapping each entity type to a replace operator operators = {entity_type: OperatorConfig("replace", {"new_value": f"<{entity_type}>"}) for entity_type in detection.mapping.keys()} - result = anonymizer.anonymize( + # Use custom anonymizer + result = anonymize( text=normalized_text, analyzer_results=detection.analyzer_results, operators=operators, diff --git a/src/guardrails/utils/anonymizer.py b/src/guardrails/utils/anonymizer.py new file mode 100644 index 0000000..b8a859f --- /dev/null +++ b/src/guardrails/utils/anonymizer.py @@ -0,0 +1,148 @@ +"""Custom anonymizer for PII masking. + +This module provides a lightweight replacement for presidio-anonymizer, +implementing text masking functionality for detected PII entities. +""" + +from __future__ import annotations + +from collections.abc import Sequence +from dataclasses import dataclass +from typing import Any, Protocol + + +class RecognizerResult(Protocol): + """Protocol for analyzer results from presidio-analyzer. + + Attributes: + start: Start position of the entity in text. + end: End position of the entity in text. + entity_type: Type of the detected entity (e.g., "EMAIL_ADDRESS"). + """ + + start: int + end: int + entity_type: str + + +@dataclass(frozen=True, slots=True) +class OperatorConfig: + """Configuration for an anonymization operator. + + Args: + operator_name: Name of the operator (e.g., "replace"). + params: Parameters for the operator (e.g., {"new_value": ""}). + """ + + operator_name: str + params: dict[str, Any] + + +@dataclass(frozen=True, slots=True) +class AnonymizeResult: + """Result of text anonymization. + + Attributes: + text: The anonymized text with entities masked. + """ + + text: str + + +def _resolve_overlaps(results: Sequence[RecognizerResult]) -> list[RecognizerResult]: + """Remove overlapping entity spans, keeping longer/earlier ones. + + When entities overlap, prioritize: + 1. Longer spans over shorter ones + 2. Earlier positions when spans are equal length + + Args: + results: Sequence of recognizer results to resolve. + + Returns: + List of non-overlapping recognizer results. + + Examples: + >>> # If EMAIL_ADDRESS spans (0, 20) and PERSON spans (5, 10), keep EMAIL_ADDRESS + >>> # If two entities span (0, 10) and (5, 15), keep the one starting at 0 + """ + if not results: + return [] + + # Sort by: 1) longer spans first, 2) earlier position for equal lengths + sorted_results = sorted( + results, + key=lambda r: (-(r.end - r.start), r.start), + ) + + # Filter out overlapping spans + non_overlapping: list[RecognizerResult] = [] + for result in sorted_results: + # Check if this result overlaps with any already selected + overlaps = False + for selected in non_overlapping: + # Two spans overlap if one starts before the other ends + if (result.start < selected.end and result.end > selected.start): + overlaps = True + break + + if not overlaps: + non_overlapping.append(result) + + return non_overlapping + + +def anonymize( + text: str, + analyzer_results: Sequence[RecognizerResult], + operators: dict[str, OperatorConfig], +) -> AnonymizeResult: + """Anonymize text by replacing detected entities with placeholders. + + This function replicates presidio-anonymizer's behavior for the "replace" + operator, which we use to mask PII with placeholders like "". + + Args: + text: The original text to anonymize. + analyzer_results: Sequence of detected entities with positions. + operators: Mapping from entity type to operator configuration. + + Returns: + AnonymizeResult with masked text. + + Examples: + >>> from collections import namedtuple + >>> Result = namedtuple("Result", ["start", "end", "entity_type"]) + >>> results = [Result(start=10, end=25, entity_type="EMAIL_ADDRESS")] + >>> operators = {"EMAIL_ADDRESS": OperatorConfig("replace", {"new_value": ""})} + >>> result = anonymize("Contact: john@example.com", results, operators) + >>> result.text + 'Contact: ' + """ + if not analyzer_results or not text: + return AnonymizeResult(text=text) + + # Resolve overlapping entities + non_overlapping = _resolve_overlaps(analyzer_results) + + # Sort by position (reverse order) to maintain correct offsets during replacement + sorted_results = sorted(non_overlapping, key=lambda r: r.start, reverse=True) + + # Replace entities from end to start + masked_text = text + for result in sorted_results: + entity_type = result.entity_type + operator_config = operators.get(entity_type) + + if operator_config and operator_config.operator_name == "replace": + # Extract the replacement value + new_value = operator_config.params.get("new_value", f"<{entity_type}>") + # Replace the text span + masked_text = ( + masked_text[: result.start] + + new_value + + masked_text[result.end :] + ) + + return AnonymizeResult(text=masked_text) + diff --git a/tests/unit/checks/test_anonymizer_baseline.py b/tests/unit/checks/test_anonymizer_baseline.py new file mode 100644 index 0000000..ca54f55 --- /dev/null +++ b/tests/unit/checks/test_anonymizer_baseline.py @@ -0,0 +1,203 @@ +"""Baseline tests for anonymizer functionality. + +This module captures the expected behavior of presidio-anonymizer to ensure +our custom implementation produces identical results. +""" + +from __future__ import annotations + +import pytest + +from guardrails.checks.text.pii import PIIConfig, PIIEntity, pii + + +@pytest.mark.asyncio +async def test_baseline_simple_email_masking() -> None: + """Test simple email masking.""" + config = PIIConfig(entities=[PIIEntity.EMAIL_ADDRESS], block=False) + result = await pii(None, "Contact me at john@example.com for details", config) + + # Record baseline output + expected = "Contact me at for details" + assert result.info["checked_text"] == expected # noqa: S101 + assert result.info["pii_detected"] is True # noqa: S101 + assert result.tripwire_triggered is False # noqa: S101 + + +@pytest.mark.asyncio +async def test_baseline_ssn_masking() -> None: + """Test SSN masking.""" + config = PIIConfig(entities=[PIIEntity.US_SSN], block=False) + result = await pii(None, "My SSN is 856-45-6789", config) + + # Record baseline output + expected = "My SSN is " + assert result.info["checked_text"] == expected # noqa: S101 + + +@pytest.mark.asyncio +async def test_baseline_multiple_non_overlapping_entities() -> None: + """Test multiple non-overlapping entities in same text.""" + config = PIIConfig( + entities=[PIIEntity.EMAIL_ADDRESS, PIIEntity.PHONE_NUMBER], + block=False, + ) + result = await pii( + None, + "Email: test@example.com, Phone: (555) 123-4567", + config, + ) + + # Record baseline output - will fill in after running + checked_text = result.info["checked_text"] + print(f"Multiple entities result: {checked_text}") + assert "" in checked_text # noqa: S101 + assert "" in checked_text # noqa: S101 + + +@pytest.mark.asyncio +async def test_baseline_consecutive_entities() -> None: + """Test consecutive entities without separation.""" + config = PIIConfig( + entities=[PIIEntity.EMAIL_ADDRESS], + block=False, + ) + result = await pii( + None, + "Emails: alice@example.com and bob@test.com", + config, + ) + + # Record baseline output + checked_text = result.info["checked_text"] + print(f"Consecutive entities result: {checked_text}") + assert checked_text.count("") == 2 # noqa: S101 + + +@pytest.mark.asyncio +async def test_baseline_entity_at_boundaries() -> None: + """Test entity at text boundaries.""" + config = PIIConfig(entities=[PIIEntity.EMAIL_ADDRESS], block=False) + + # Email at start + result_start = await pii(None, "user@example.com is the contact", config) + print(f"Entity at start: {result_start.info['checked_text']}") + + # Email at end + result_end = await pii(None, "Contact: user@example.com", config) + print(f"Entity at end: {result_end.info['checked_text']}") + + assert result_start.info["checked_text"].startswith("") # noqa: S101 + assert result_end.info["checked_text"].endswith("") # noqa: S101 + + +@pytest.mark.asyncio +async def test_baseline_unicode_characters() -> None: + """Test masking with Unicode characters.""" + config = PIIConfig(entities=[PIIEntity.EMAIL_ADDRESS], block=False) + result = await pii( + None, + "Email: user@example.com 🔒 Secure contact", + config, + ) + + # Record baseline output + checked_text = result.info["checked_text"] + print(f"Unicode result: {checked_text}") + assert "" in checked_text # noqa: S101 + assert "🔒" in checked_text # noqa: S101 + + +@pytest.mark.asyncio +async def test_baseline_special_characters() -> None: + """Test masking with special characters.""" + config = PIIConfig(entities=[PIIEntity.EMAIL_ADDRESS], block=False) + result = await pii( + None, + "Contact: [user@example.com] or {admin@test.com}", + config, + ) + + # Record baseline output + checked_text = result.info["checked_text"] + print(f"Special chars result: {checked_text}") + assert "[]" in checked_text or "Contact: " in checked_text # noqa: S101 + + +@pytest.mark.asyncio +async def test_baseline_no_pii_detected() -> None: + """Test text with no PII.""" + config = PIIConfig(entities=[PIIEntity.EMAIL_ADDRESS, PIIEntity.US_SSN], block=False) + result = await pii(None, "This is plain text with no PII at all", config) + + # Record baseline output + assert result.info["checked_text"] == "This is plain text with no PII at all" # noqa: S101 + assert result.info["pii_detected"] is False # noqa: S101 + + +@pytest.mark.asyncio +async def test_baseline_credit_card_masking() -> None: + """Test credit card masking.""" + config = PIIConfig(entities=[PIIEntity.CREDIT_CARD], block=False) + result = await pii(None, "Card number: 4532123456789010", config) + + # Record baseline output + checked_text = result.info["checked_text"] + print(f"Credit card result: {checked_text}") + # Credit card detection may be inconsistent with certain formats + if result.info["pii_detected"]: + assert "" in checked_text # noqa: S101 + + +@pytest.mark.asyncio +async def test_baseline_phone_number_formats() -> None: + """Test various phone number formats.""" + config = PIIConfig(entities=[PIIEntity.PHONE_NUMBER], block=False) + + # Test multiple formats + texts_and_results = [] + + result1 = await pii(None, "Call me at (555) 123-4567", config) + texts_and_results.append(("(555) 123-4567", result1.info["checked_text"])) + + result2 = await pii(None, "Phone: 555-123-4567", config) + texts_and_results.append(("555-123-4567", result2.info["checked_text"])) + + result3 = await pii(None, "Mobile: 5551234567", config) + texts_and_results.append(("5551234567", result3.info["checked_text"])) + + for original, checked in texts_and_results: + print(f"Phone format '{original}': {checked}") + # At least one should be detected and masked + + # Check that at least the first format is detected + assert "" in result1.info["checked_text"] # noqa: S101 + + +@pytest.mark.asyncio +async def test_baseline_mixed_entities_complex() -> None: + """Test complex text with multiple entity types.""" + config = PIIConfig( + entities=[ + PIIEntity.EMAIL_ADDRESS, + PIIEntity.PHONE_NUMBER, + PIIEntity.US_SSN, + ], + block=False, + ) + result = await pii( + None, + "Contact John at john@company.com or call (555) 123-4567. " + "SSN: 856-45-6789", + config, + ) + + # Record baseline output + checked_text = result.info["checked_text"] + print(f"Complex mixed result: {checked_text}") + + # Verify all entity types are masked + assert "" in checked_text # noqa: S101 + assert "" in checked_text or "555" not in checked_text # noqa: S101 + assert "" in checked_text # noqa: S101 + diff --git a/tests/unit/checks/test_pii.py b/tests/unit/checks/test_pii.py index 71a5f82..0907458 100644 --- a/tests/unit/checks/test_pii.py +++ b/tests/unit/checks/test_pii.py @@ -348,6 +348,67 @@ async def test_pii_detects_8char_bic() -> None: assert "BIC_SWIFT" in result.info["detected_entities"] # noqa: S101 +@pytest.mark.asyncio +async def test_pii_does_not_detect_common_words_as_bic() -> None: + """Common 8-letter words should NOT be detected as BIC/SWIFT codes.""" + config = PIIConfig(entities=[PIIEntity.BIC_SWIFT], block=False) + # Test words that match the length pattern but have invalid country codes + test_cases = [ + "The CUSTOMER ordered a product.", + "We will REGISTER your account.", + "Please CONSIDER this option.", + "The DOCUMENT is ready.", + "This is ABSTRACT art.", + ] + + for text in test_cases: + result = await pii(None, text, config) + assert result.info["pii_detected"] is False, f"False positive for: {text}" # noqa: S101 + assert "BIC_SWIFT" not in result.info["detected_entities"], f"False positive for: {text}" # noqa: S101 + + +@pytest.mark.asyncio +async def test_pii_detects_various_country_bic_codes() -> None: + """BIC codes from various countries should be detected.""" + config = PIIConfig(entities=[PIIEntity.BIC_SWIFT], block=False) + test_cases = [ + ("DEUTDEFF500", "Germany"), # Deutsche Bank + ("CHASUS33", "United States"), # Chase + ("BARCGB22", "United Kingdom"), # Barclays + ("BNPAFRPP", "France"), # BNP Paribas + ("HSBCJPJT", "Japan"), # HSBC Japan + ("CITIGB2L", "United Kingdom"), # Citibank UK + ] + + for bic_code, country in test_cases: + text = f"Bank code: {bic_code}" + result = await pii(None, text, config) + assert result.info["pii_detected"] is True, f"Failed to detect {country} BIC: {bic_code}" # noqa: S101 + assert "BIC_SWIFT" in result.info["detected_entities"], f"Failed to detect {country} BIC: {bic_code}" # noqa: S101 + + +@pytest.mark.asyncio +async def test_pii_detects_korean_bank_bic_codes() -> None: + """BIC codes from Korean banks should be detected.""" + config = PIIConfig(entities=[PIIEntity.BIC_SWIFT], block=False) + test_cases = [ + ("CZNBKRSE", "KB Kookmin Bank"), + ("SHBKKRSE", "Shinhan Bank"), + ("KOEXKRSE", "Hana Bank"), + ("HVBKKRSE", "Woori Bank"), + ("NACFKRSE", "NH Bank"), + ("IBKOKRSE", "IBK Industrial Bank"), + ("KODBKRSE", "Korea Development Bank"), + ] + + for bic_code, bank_name in test_cases: + text = f"Transfer to {bic_code}" + result = await pii(None, text, config) + assert result.info["pii_detected"] is True, f"Failed to detect {bank_name}: {bic_code}" # noqa: S101 + assert "BIC_SWIFT" in result.info["detected_entities"], f"Failed to detect {bank_name}: {bic_code}" # noqa: S101 + assert bic_code in result.info["detected_entities"]["BIC_SWIFT"], f"BIC code {bic_code} not in detected entities" # noqa: S101 + + # Encoded PII Detection Tests From 695b0d4fe734f52ff1f89efe2b641f7aa3b6393c Mon Sep 17 00:00:00 2001 From: Steven C Date: Mon, 10 Nov 2025 14:51:53 -0500 Subject: [PATCH 2/5] Mask in sorted order --- src/guardrails/_base_client.py | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/src/guardrails/_base_client.py b/src/guardrails/_base_client.py index 811e669..a7bb22b 100644 --- a/src/guardrails/_base_client.py +++ b/src/guardrails/_base_client.py @@ -309,19 +309,28 @@ def _mask_text(text: str) -> str: decoded_results = analyzer.analyze(decoded_text_for_masking, entities=entity_types, language="en") if decoded_results: - # Map detections back to mask encoded chunks + # Build list of (candidate, entity_type) pairs to mask + candidates_to_mask = [] + for result in decoded_results: detected_value = decoded_text_for_masking[result.start : result.end] entity_type = result.entity_type # Find candidate that contains this PII for candidate in candidates_for_masking: - if detected_value in candidate.decoded_text: - # Mask the encoded version - entity_marker = f"<{entity_type}_ENCODED>" - masked = masked[: candidate.start] + entity_marker + masked[candidate.end :] + if candidate.decoded_text and detected_value.lower() in candidate.decoded_text.lower(): + candidates_to_mask.append((candidate, entity_type)) break + # Sort by position (reverse) to mask from end to start + # This preserves position validity for subsequent replacements + candidates_to_mask.sort(key=lambda x: x[0].start, reverse=True) + + # Mask from end to start + for candidate, entity_type in candidates_to_mask: + entity_marker = f"<{entity_type}_ENCODED>" + masked = masked[: candidate.start] + entity_marker + masked[candidate.end :] + return masked # Mask each text part From bc611a8fe79e2adee8242f08e77e14ec2a3aa1f7 Mon Sep 17 00:00:00 2001 From: Steven C Date: Mon, 10 Nov 2025 14:55:17 -0500 Subject: [PATCH 3/5] Improve overlap detection --- src/guardrails/_base_client.py | 26 ++++++++++++++++++++++++-- 1 file changed, 24 insertions(+), 2 deletions(-) diff --git a/src/guardrails/_base_client.py b/src/guardrails/_base_client.py index a7bb22b..48b2f70 100644 --- a/src/guardrails/_base_client.py +++ b/src/guardrails/_base_client.py @@ -316,9 +316,31 @@ def _mask_text(text: str) -> str: detected_value = decoded_text_for_masking[result.start : result.end] entity_type = result.entity_type - # Find candidate that contains this PII + # Find candidate that overlaps with this PII + # Use comprehensive overlap logic matching pii.py implementation for candidate in candidates_for_masking: - if candidate.decoded_text and detected_value.lower() in candidate.decoded_text.lower(): + if not candidate.decoded_text: + continue + + candidate_lower = candidate.decoded_text.lower() + detected_lower = detected_value.lower() + + # Check if candidate's decoded text overlaps with the detection + # Handle partial encodings where encoded span may include extra characters + # e.g., %3A%6a%6f%65%40 → ":joe@" but only "joe@" is in email "joe@domain.com" + has_overlap = ( + candidate_lower in detected_lower # Candidate is substring of detection + or detected_lower in candidate_lower # Detection is substring of candidate + or ( + len(candidate_lower) >= 3 + and any( # Any 3-char chunk overlaps + candidate_lower[i : i + 3] in detected_lower + for i in range(0, len(candidate_lower) - 2, 2) # Step by 2 for efficiency + ) + ) + ) + + if has_overlap: candidates_to_mask.append((candidate, entity_type)) break From cd2173e3cf034ecdf078dd2e544694d611ef979d Mon Sep 17 00:00:00 2001 From: Steven C Date: Mon, 10 Nov 2025 15:09:49 -0500 Subject: [PATCH 4/5] Fix known bank codes --- src/guardrails/checks/text/pii.py | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/src/guardrails/checks/text/pii.py b/src/guardrails/checks/text/pii.py index aecb309..ecf79fd 100644 --- a/src/guardrails/checks/text/pii.py +++ b/src/guardrails/checks/text/pii.py @@ -171,23 +171,24 @@ def _get_analyzer_engine() -> AnalyzerEngine: # Pattern 2: Known banking institutions (4-letter bank codes from major banks) # This whitelist approach has very low false positive rate # Only detects codes starting with known bank identifiers + # NOTE: Must be exactly 4 characters (bank identifier only, not full BIC) known_bank_codes = ( "DEUT|CHAS|BARC|HSBC|BNPA|CITI|WELL|BOFA|JPMC|GSCC|MSNY|" # Major international - "COBA|DRSD|BYLADEM|MALADE|HYVEDEMM|" # Germany + "COBA|DRSD|BYLA|MALA|HYVE|" # Germany "WFBI|USBC|" # US "LOYD|MIDL|NWBK|RBOS|" # UK - "CRLY|SOGEFRPP|AGRIFRPP|" # France - "UBSW|CRESCHZZ|" # Switzerland + "CRLY|SOGE|AGRI|" # France + "UBSW|CRES|" # Switzerland "SANB|BBVA|" # Spain - "UNCRITMM|BCITITMMXXX|" # Italy + "UNCR|BCIT|" # Italy "INGB|ABNA|RABO|" # Netherlands "ROYA|TDOM|BNSC|" # Canada "ANZB|NATA|WPAC|CTBA|" # Australia - "BKCHJPJT|MHCBJPJT|BOTKJPJT|" # Japan - "ICBKCNBJ|BKCHCNBJ|ABOCCNBJ|PCBCCNBJ|" # China - "HSBCHKHH|SCBLHKHH|" # Hong Kong - "DBSSSGSG|OCBCSGSG|UOVBSGSG|" # Singapore - "CZNB|SHBK|KOEX|HVBK|NACF|IBKO|KODB|HNBN|CITIKRSX" # South Korea + "BKCH|MHCB|BOTK|" # Japan + "ICBK|ABOC|PCBC|" # China + "HSBC|SCBL|" # Hong Kong + "DBSS|OCBC|UOVB|" # Singapore + "CZNB|SHBK|KOEX|HVBK|NACF|IBKO|KODB|HNBN|CITI" # South Korea ) known_bic_pattern = Pattern( From 5ffd47d1ef5218d19c5095a037b2a15d8556322e Mon Sep 17 00:00:00 2001 From: Steven C Date: Mon, 10 Nov 2025 15:18:52 -0500 Subject: [PATCH 5/5] Clean up test files --- src/guardrails/_base_client.py | 2 +- src/guardrails/checks/text/pii.py | 2 +- tests/unit/checks/test_anonymizer_baseline.py | 14 +------------- 3 files changed, 3 insertions(+), 15 deletions(-) diff --git a/src/guardrails/_base_client.py b/src/guardrails/_base_client.py index 48b2f70..c4bb399 100644 --- a/src/guardrails/_base_client.py +++ b/src/guardrails/_base_client.py @@ -335,7 +335,7 @@ def _mask_text(text: str) -> str: len(candidate_lower) >= 3 and any( # Any 3-char chunk overlaps candidate_lower[i : i + 3] in detected_lower - for i in range(0, len(candidate_lower) - 2, 2) # Step by 2 for efficiency + for i in range(len(candidate_lower) - 2) ) ) ) diff --git a/src/guardrails/checks/text/pii.py b/src/guardrails/checks/text/pii.py index ecf79fd..3e9e762 100644 --- a/src/guardrails/checks/text/pii.py +++ b/src/guardrails/checks/text/pii.py @@ -726,7 +726,7 @@ def _mask_encoded_pii(text: str, config: PIIConfig, original_text: str | None = len(candidate_lower) >= 3 and any( # Any 3-char chunk overlaps candidate_lower[i : i + 3] in detected_lower - for i in range(0, len(candidate_lower) - 2, 2) # Step by 2 for efficiency + for i in range(len(candidate_lower) - 2) ) ) ) diff --git a/tests/unit/checks/test_anonymizer_baseline.py b/tests/unit/checks/test_anonymizer_baseline.py index ca54f55..52a2d7c 100644 --- a/tests/unit/checks/test_anonymizer_baseline.py +++ b/tests/unit/checks/test_anonymizer_baseline.py @@ -48,9 +48,8 @@ async def test_baseline_multiple_non_overlapping_entities() -> None: config, ) - # Record baseline output - will fill in after running + # Record baseline output checked_text = result.info["checked_text"] - print(f"Multiple entities result: {checked_text}") assert "" in checked_text # noqa: S101 assert "" in checked_text # noqa: S101 @@ -70,7 +69,6 @@ async def test_baseline_consecutive_entities() -> None: # Record baseline output checked_text = result.info["checked_text"] - print(f"Consecutive entities result: {checked_text}") assert checked_text.count("") == 2 # noqa: S101 @@ -81,11 +79,9 @@ async def test_baseline_entity_at_boundaries() -> None: # Email at start result_start = await pii(None, "user@example.com is the contact", config) - print(f"Entity at start: {result_start.info['checked_text']}") # Email at end result_end = await pii(None, "Contact: user@example.com", config) - print(f"Entity at end: {result_end.info['checked_text']}") assert result_start.info["checked_text"].startswith("") # noqa: S101 assert result_end.info["checked_text"].endswith("") # noqa: S101 @@ -103,7 +99,6 @@ async def test_baseline_unicode_characters() -> None: # Record baseline output checked_text = result.info["checked_text"] - print(f"Unicode result: {checked_text}") assert "" in checked_text # noqa: S101 assert "🔒" in checked_text # noqa: S101 @@ -120,7 +115,6 @@ async def test_baseline_special_characters() -> None: # Record baseline output checked_text = result.info["checked_text"] - print(f"Special chars result: {checked_text}") assert "[]" in checked_text or "Contact: " in checked_text # noqa: S101 @@ -143,7 +137,6 @@ async def test_baseline_credit_card_masking() -> None: # Record baseline output checked_text = result.info["checked_text"] - print(f"Credit card result: {checked_text}") # Credit card detection may be inconsistent with certain formats if result.info["pii_detected"]: assert "" in checked_text # noqa: S101 @@ -166,10 +159,6 @@ async def test_baseline_phone_number_formats() -> None: result3 = await pii(None, "Mobile: 5551234567", config) texts_and_results.append(("5551234567", result3.info["checked_text"])) - for original, checked in texts_and_results: - print(f"Phone format '{original}': {checked}") - # At least one should be detected and masked - # Check that at least the first format is detected assert "" in result1.info["checked_text"] # noqa: S101 @@ -194,7 +183,6 @@ async def test_baseline_mixed_entities_complex() -> None: # Record baseline output checked_text = result.info["checked_text"] - print(f"Complex mixed result: {checked_text}") # Verify all entity types are masked assert "" in checked_text # noqa: S101