|
89 | 89 | from presidio_analyzer.predefined_recognizers.country_specific.korea.kr_rrn_recognizer import ( |
90 | 90 | KrRrnRecognizer, |
91 | 91 | ) |
92 | | -from presidio_anonymizer import AnonymizerEngine |
93 | | -from presidio_anonymizer.entities import OperatorConfig |
94 | 92 | from pydantic import BaseModel, ConfigDict, Field |
95 | 93 |
|
96 | 94 | from guardrails.registry import default_spec_registry |
97 | 95 | from guardrails.spec import GuardrailSpecMetadata |
98 | 96 | from guardrails.types import GuardrailResult |
| 97 | +from guardrails.utils.anonymizer import OperatorConfig, anonymize |
99 | 98 |
|
100 | 99 | __all__ = ["pii"] |
101 | 100 |
|
@@ -155,15 +154,54 @@ def _get_analyzer_engine() -> AnalyzerEngine: |
155 | 154 | ) |
156 | 155 |
|
157 | 156 | # BIC/SWIFT code recognizer (8 or 11 characters: 4 bank + 2 country + 2 location + 3 branch) |
158 | | - bic_pattern = Pattern( |
159 | | - name="bic_swift_pattern", |
160 | | - regex=r"\b[A-Z]{4}[A-Z]{2}[A-Z0-9]{2}([A-Z0-9]{3})?\b", |
161 | | - score=0.75, |
| 157 | + # Uses context-aware pattern to reduce false positives on common words like "CUSTOMER" |
| 158 | + # Requires either: |
| 159 | + # 1. Explicit prefix (SWIFT:, BIC:, Bank Code:, etc.) OR |
| 160 | + # 2. Known bank code from major financial institutions |
| 161 | + # This significantly reduces false positives while maintaining high recall for actual BIC codes |
| 162 | + |
| 163 | + # Pattern 1: Explicit context with common BIC/SWIFT prefixes (high confidence) |
| 164 | + # Case-insensitive for the context words, but code itself must be uppercase |
| 165 | + bic_with_context_pattern = Pattern( |
| 166 | + name="bic_with_context", |
| 167 | + regex=r"(?i)(?:swift|bic|bank[\s-]?code|swift[\s-]?code|bic[\s-]?code)(?-i)[:\s=]+([A-Z]{4}[A-Z]{2}[A-Z0-9]{2}(?:[A-Z0-9]{3})?)\b", |
| 168 | + score=0.95, |
162 | 169 | ) |
| 170 | + |
| 171 | + # Pattern 2: Known banking institutions (4-letter bank codes from major banks) |
| 172 | + # This whitelist approach has very low false positive rate |
| 173 | + # Only detects codes starting with known bank identifiers |
| 174 | + # NOTE: Must be exactly 4 characters (bank identifier only, not full BIC) |
| 175 | + known_bank_codes = ( |
| 176 | + "DEUT|CHAS|BARC|HSBC|BNPA|CITI|WELL|BOFA|JPMC|GSCC|MSNY|" # Major international |
| 177 | + "COBA|DRSD|BYLA|MALA|HYVE|" # Germany |
| 178 | + "WFBI|USBC|" # US |
| 179 | + "LOYD|MIDL|NWBK|RBOS|" # UK |
| 180 | + "CRLY|SOGE|AGRI|" # France |
| 181 | + "UBSW|CRES|" # Switzerland |
| 182 | + "SANB|BBVA|" # Spain |
| 183 | + "UNCR|BCIT|" # Italy |
| 184 | + "INGB|ABNA|RABO|" # Netherlands |
| 185 | + "ROYA|TDOM|BNSC|" # Canada |
| 186 | + "ANZB|NATA|WPAC|CTBA|" # Australia |
| 187 | + "BKCH|MHCB|BOTK|" # Japan |
| 188 | + "ICBK|ABOC|PCBC|" # China |
| 189 | + "HSBC|SCBL|" # Hong Kong |
| 190 | + "DBSS|OCBC|UOVB|" # Singapore |
| 191 | + "CZNB|SHBK|KOEX|HVBK|NACF|IBKO|KODB|HNBN|CITI" # South Korea |
| 192 | + ) |
| 193 | + |
| 194 | + known_bic_pattern = Pattern( |
| 195 | + name="known_bic_codes", |
| 196 | + regex=rf"\b(?:{known_bank_codes})[A-Z]{{2}}[A-Z0-9]{{2}}(?:[A-Z0-9]{{3}})?\b", |
| 197 | + score=0.90, |
| 198 | + ) |
| 199 | + |
| 200 | + # Register both patterns |
163 | 201 | registry.add_recognizer( |
164 | 202 | PatternRecognizer( |
165 | 203 | supported_entity="BIC_SWIFT", |
166 | | - patterns=[bic_pattern], |
| 204 | + patterns=[bic_with_context_pattern, known_bic_pattern], |
167 | 205 | supported_language="en", |
168 | 206 | ) |
169 | 207 | ) |
@@ -192,19 +230,6 @@ def _get_analyzer_engine() -> AnalyzerEngine: |
192 | 230 | return engine |
193 | 231 |
|
194 | 232 |
|
195 | | -@functools.lru_cache(maxsize=1) |
196 | | -def _get_anonymizer_engine() -> AnonymizerEngine: |
197 | | - """Return a cached AnonymizerEngine for PII masking. |
198 | | -
|
199 | | - Uses Presidio's built-in anonymization for optimal performance and |
200 | | - correct handling of overlapping entities, Unicode, and special characters. |
201 | | -
|
202 | | - Returns: |
203 | | - AnonymizerEngine: Configured anonymizer for replacing PII entities. |
204 | | - """ |
205 | | - return AnonymizerEngine() |
206 | | - |
207 | | - |
208 | 233 | class PIIEntity(str, Enum): |
209 | 234 | """Supported PII entity types for detection. |
210 | 235 |
|
@@ -460,9 +485,7 @@ def _try_decode_base64(text: str) -> str | None: |
460 | 485 | decoded_bytes = base64.b64decode(text, validate=True) |
461 | 486 | # Security: Fail closed - reject content > 10KB to prevent memory DoS and PII bypass |
462 | 487 | if len(decoded_bytes) > 10_000: |
463 | | - msg = ( |
464 | | - f"Base64 decoded content too large ({len(decoded_bytes):,} bytes). Maximum allowed is 10KB." |
465 | | - ) |
| 488 | + msg = f"Base64 decoded content too large ({len(decoded_bytes):,} bytes). Maximum allowed is 10KB." |
466 | 489 | raise ValueError(msg) |
467 | 490 | # Check if result is valid UTF-8 |
468 | 491 | return decoded_bytes.decode("utf-8", errors="strict") |
@@ -590,11 +613,10 @@ def _build_decoded_text(text: str) -> tuple[str, list[EncodedCandidate]]: |
590 | 613 |
|
591 | 614 |
|
592 | 615 | def _mask_pii(text: str, detection: PiiDetectionResult, config: PIIConfig) -> tuple[str, dict[str, list[str]]]: |
593 | | - """Mask detected PII using Presidio's AnonymizerEngine. |
| 616 | + """Mask detected PII using custom anonymizer. |
594 | 617 |
|
595 | 618 | Normalizes Unicode before masking to ensure consistency with detection. |
596 | | - Uses Presidio's built-in anonymization for optimal performance and |
597 | | - correct handling of overlapping entities, Unicode, and special characters. |
| 619 | + Handles overlapping entities, Unicode, and special characters correctly. |
598 | 620 |
|
599 | 621 | If detect_encoded_pii is enabled, also detects and masks PII in |
600 | 622 | Base64/URL-encoded/hex strings using a hybrid approach. |
@@ -627,13 +649,11 @@ def _mask_pii(text: str, detection: PiiDetectionResult, config: PIIConfig) -> tu |
627 | 649 | # No PII detected - return original text to preserve special characters |
628 | 650 | return text, {} |
629 | 651 |
|
630 | | - # Use Presidio's optimized anonymizer with replace operator |
631 | | - anonymizer = _get_anonymizer_engine() |
632 | | - |
633 | 652 | # Create operators mapping each entity type to a replace operator |
634 | 653 | operators = {entity_type: OperatorConfig("replace", {"new_value": f"<{entity_type}>"}) for entity_type in detection.mapping.keys()} |
635 | 654 |
|
636 | | - result = anonymizer.anonymize( |
| 655 | + # Use custom anonymizer |
| 656 | + result = anonymize( |
637 | 657 | text=normalized_text, |
638 | 658 | analyzer_results=detection.analyzer_results, |
639 | 659 | operators=operators, |
@@ -706,7 +726,7 @@ def _mask_encoded_pii(text: str, config: PIIConfig, original_text: str | None = |
706 | 726 | len(candidate_lower) >= 3 |
707 | 727 | and any( # Any 3-char chunk overlaps |
708 | 728 | candidate_lower[i : i + 3] in detected_lower |
709 | | - for i in range(0, len(candidate_lower) - 2, 2) # Step by 2 for efficiency |
| 729 | + for i in range(len(candidate_lower) - 2) |
710 | 730 | ) |
711 | 731 | ) |
712 | 732 | ) |
|
0 commit comments