-
Notifications
You must be signed in to change notification settings - Fork 17
Dev/steven/remove anonymizer #47
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -89,13 +89,12 @@ | |||||
| from presidio_analyzer.predefined_recognizers.country_specific.korea.kr_rrn_recognizer import ( | ||||||
| KrRrnRecognizer, | ||||||
| ) | ||||||
| from presidio_anonymizer import AnonymizerEngine | ||||||
| from presidio_anonymizer.entities import OperatorConfig | ||||||
| from pydantic import BaseModel, ConfigDict, Field | ||||||
|
|
||||||
| from guardrails.registry import default_spec_registry | ||||||
| from guardrails.spec import GuardrailSpecMetadata | ||||||
| from guardrails.types import GuardrailResult | ||||||
| from guardrails.utils.anonymizer import OperatorConfig, anonymize | ||||||
|
|
||||||
| __all__ = ["pii"] | ||||||
|
|
||||||
|
|
@@ -155,15 +154,54 @@ def _get_analyzer_engine() -> AnalyzerEngine: | |||||
| ) | ||||||
|
|
||||||
| # BIC/SWIFT code recognizer (8 or 11 characters: 4 bank + 2 country + 2 location + 3 branch) | ||||||
| bic_pattern = Pattern( | ||||||
| name="bic_swift_pattern", | ||||||
| regex=r"\b[A-Z]{4}[A-Z]{2}[A-Z0-9]{2}([A-Z0-9]{3})?\b", | ||||||
| score=0.75, | ||||||
| # Uses context-aware pattern to reduce false positives on common words like "CUSTOMER" | ||||||
| # Requires either: | ||||||
| # 1. Explicit prefix (SWIFT:, BIC:, Bank Code:, etc.) OR | ||||||
| # 2. Known bank code from major financial institutions | ||||||
| # This significantly reduces false positives while maintaining high recall for actual BIC codes | ||||||
|
|
||||||
| # Pattern 1: Explicit context with common BIC/SWIFT prefixes (high confidence) | ||||||
| # Case-insensitive for the context words, but code itself must be uppercase | ||||||
| bic_with_context_pattern = Pattern( | ||||||
| name="bic_with_context", | ||||||
| regex=r"(?i)(?:swift|bic|bank[\s-]?code|swift[\s-]?code|bic[\s-]?code)(?-i)[:\s=]+([A-Z]{4}[A-Z]{2}[A-Z0-9]{2}(?:[A-Z0-9]{3})?)\b", | ||||||
| score=0.95, | ||||||
| ) | ||||||
|
|
||||||
| # Pattern 2: Known banking institutions (4-letter bank codes from major banks) | ||||||
| # This whitelist approach has very low false positive rate | ||||||
| # Only detects codes starting with known bank identifiers | ||||||
| # NOTE: Must be exactly 4 characters (bank identifier only, not full BIC) | ||||||
| known_bank_codes = ( | ||||||
| "DEUT|CHAS|BARC|HSBC|BNPA|CITI|WELL|BOFA|JPMC|GSCC|MSNY|" # Major international | ||||||
| "COBA|DRSD|BYLA|MALA|HYVE|" # Germany | ||||||
| "WFBI|USBC|" # US | ||||||
| "LOYD|MIDL|NWBK|RBOS|" # UK | ||||||
| "CRLY|SOGE|AGRI|" # France | ||||||
| "UBSW|CRES|" # Switzerland | ||||||
| "SANB|BBVA|" # Spain | ||||||
| "UNCR|BCIT|" # Italy | ||||||
| "INGB|ABNA|RABO|" # Netherlands | ||||||
| "ROYA|TDOM|BNSC|" # Canada | ||||||
| "ANZB|NATA|WPAC|CTBA|" # Australia | ||||||
| "BKCH|MHCB|BOTK|" # Japan | ||||||
| "ICBK|ABOC|PCBC|" # China | ||||||
| "HSBC|SCBL|" # Hong Kong | ||||||
| "DBSS|OCBC|UOVB|" # Singapore | ||||||
| "CZNB|SHBK|KOEX|HVBK|NACF|IBKO|KODB|HNBN|CITI" # South Korea | ||||||
|
||||||
| "CZNB|SHBK|KOEX|HVBK|NACF|IBKO|KODB|HNBN|CITI" # South Korea | |
| "CZNB|SHBK|KOEX|HVBK|NACF|IBKO|KODB|HNBN" # South Korea |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nit]
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,148 @@ | ||
| """Custom anonymizer for PII masking. | ||
|
|
||
| This module provides a lightweight replacement for presidio-anonymizer, | ||
| implementing text masking functionality for detected PII entities. | ||
| """ | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| from collections.abc import Sequence | ||
| from dataclasses import dataclass | ||
| from typing import Any, Protocol | ||
|
|
||
|
|
||
| class RecognizerResult(Protocol): | ||
| """Protocol for analyzer results from presidio-analyzer. | ||
|
|
||
| Attributes: | ||
| start: Start position of the entity in text. | ||
| end: End position of the entity in text. | ||
| entity_type: Type of the detected entity (e.g., "EMAIL_ADDRESS"). | ||
| """ | ||
|
|
||
| start: int | ||
| end: int | ||
| entity_type: str | ||
|
|
||
|
|
||
| @dataclass(frozen=True, slots=True) | ||
| class OperatorConfig: | ||
| """Configuration for an anonymization operator. | ||
|
|
||
| Args: | ||
| operator_name: Name of the operator (e.g., "replace"). | ||
| params: Parameters for the operator (e.g., {"new_value": "<EMAIL>"}). | ||
| """ | ||
|
|
||
| operator_name: str | ||
| params: dict[str, Any] | ||
|
|
||
|
|
||
| @dataclass(frozen=True, slots=True) | ||
| class AnonymizeResult: | ||
| """Result of text anonymization. | ||
|
|
||
| Attributes: | ||
| text: The anonymized text with entities masked. | ||
| """ | ||
|
|
||
| text: str | ||
|
|
||
|
|
||
| def _resolve_overlaps(results: Sequence[RecognizerResult]) -> list[RecognizerResult]: | ||
| """Remove overlapping entity spans, keeping longer/earlier ones. | ||
|
|
||
| When entities overlap, prioritize: | ||
| 1. Longer spans over shorter ones | ||
| 2. Earlier positions when spans are equal length | ||
|
|
||
| Args: | ||
| results: Sequence of recognizer results to resolve. | ||
|
|
||
| Returns: | ||
| List of non-overlapping recognizer results. | ||
|
|
||
| Examples: | ||
| >>> # If EMAIL_ADDRESS spans (0, 20) and PERSON spans (5, 10), keep EMAIL_ADDRESS | ||
| >>> # If two entities span (0, 10) and (5, 15), keep the one starting at 0 | ||
| """ | ||
| if not results: | ||
| return [] | ||
|
|
||
| # Sort by: 1) longer spans first, 2) earlier position for equal lengths | ||
| sorted_results = sorted( | ||
| results, | ||
| key=lambda r: (-(r.end - r.start), r.start), | ||
| ) | ||
|
|
||
| # Filter out overlapping spans | ||
| non_overlapping: list[RecognizerResult] = [] | ||
| for result in sorted_results: | ||
| # Check if this result overlaps with any already selected | ||
| overlaps = False | ||
| for selected in non_overlapping: | ||
| # Two spans overlap if one starts before the other ends | ||
| if (result.start < selected.end and result.end > selected.start): | ||
| overlaps = True | ||
| break | ||
|
|
||
| if not overlaps: | ||
| non_overlapping.append(result) | ||
|
|
||
| return non_overlapping | ||
|
|
||
|
|
||
| def anonymize( | ||
| text: str, | ||
| analyzer_results: Sequence[RecognizerResult], | ||
| operators: dict[str, OperatorConfig], | ||
| ) -> AnonymizeResult: | ||
| """Anonymize text by replacing detected entities with placeholders. | ||
|
|
||
| This function replicates presidio-anonymizer's behavior for the "replace" | ||
| operator, which we use to mask PII with placeholders like "<EMAIL_ADDRESS>". | ||
|
|
||
| Args: | ||
| text: The original text to anonymize. | ||
| analyzer_results: Sequence of detected entities with positions. | ||
| operators: Mapping from entity type to operator configuration. | ||
|
|
||
| Returns: | ||
| AnonymizeResult with masked text. | ||
|
|
||
| Examples: | ||
| >>> from collections import namedtuple | ||
| >>> Result = namedtuple("Result", ["start", "end", "entity_type"]) | ||
| >>> results = [Result(start=10, end=25, entity_type="EMAIL_ADDRESS")] | ||
| >>> operators = {"EMAIL_ADDRESS": OperatorConfig("replace", {"new_value": "<EMAIL_ADDRESS>"})} | ||
| >>> result = anonymize("Contact: john@example.com", results, operators) | ||
| >>> result.text | ||
| 'Contact: <EMAIL_ADDRESS>' | ||
| """ | ||
| if not analyzer_results or not text: | ||
| return AnonymizeResult(text=text) | ||
|
|
||
| # Resolve overlapping entities | ||
| non_overlapping = _resolve_overlaps(analyzer_results) | ||
|
|
||
| # Sort by position (reverse order) to maintain correct offsets during replacement | ||
| sorted_results = sorted(non_overlapping, key=lambda r: r.start, reverse=True) | ||
|
|
||
| # Replace entities from end to start | ||
| masked_text = text | ||
| for result in sorted_results: | ||
| entity_type = result.entity_type | ||
| operator_config = operators.get(entity_type) | ||
|
|
||
| if operator_config and operator_config.operator_name == "replace": | ||
| # Extract the replacement value | ||
| new_value = operator_config.params.get("new_value", f"<{entity_type}>") | ||
| # Replace the text span | ||
| masked_text = ( | ||
| masked_text[: result.start] | ||
| + new_value | ||
| + masked_text[result.end :] | ||
| ) | ||
|
|
||
| return AnonymizeResult(text=masked_text) | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"HSBC" is duplicated in the known_bank_codes string. It already appears in line 176 as part of the "Major international" banks. Consider removing the duplicate "HSBC" from the Hong Kong section to avoid redundancy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nit]