import re from typing import Any _CPF_PATTERN = re.compile(r"(? str | None: if value is None: return None text = str(value) if not text: return text masked = _LABELED_EXTERNAL_ID_PATTERN.sub( lambda match: f"{match.group(1)}{_mask_identifier_value(match.group(2), suffix=3)}", text, ) masked = _LABELED_RECEIPT_IDENTIFIER_PATTERN.sub( lambda match: f"{match.group(1)}{_mask_identifier_value(match.group(2), suffix=3)}", masked, ) masked = _CPF_PATTERN.sub(lambda match: _mask_cpf_value(match.group(1)), masked) masked = _PLATE_PATTERN.sub(lambda match: _mask_plate_value(match.group(1)), masked) return masked def mask_sensitive_payload(value: Any, *, key: str | None = None) -> Any: key_kind = _classify_sensitive_key(key) if key_kind is not None: return _mask_value_by_kind(value, key_kind) if isinstance(value, dict): return {item_key: mask_sensitive_payload(item_value, key=item_key) for item_key, item_value in value.items()} if isinstance(value, list): return [mask_sensitive_payload(item, key=key) for item in value] if isinstance(value, tuple): return tuple(mask_sensitive_payload(item, key=key) for item in value) if isinstance(value, set): return {mask_sensitive_payload(item, key=key) for item in value} if isinstance(value, str): return mask_sensitive_text(value) return value def _classify_sensitive_key(key: str | None) -> str | None: normalized = _normalize_key(key) if not normalized: return None if normalized in _CPF_KEYS or normalized.endswith("_cpf"): return "cpf" if normalized in _PLATE_KEYS or normalized.endswith("_placa") or normalized.endswith("_plate"): return "placa" if normalized in _EXTERNAL_ID_KEYS: return "external_id" if normalized in _RECEIPT_IDENTIFIER_KEYS: return "receipt_identifier" return None def _normalize_key(key: str | None) -> str: return re.sub(r"[^a-z0-9]+", "_", str(key or "").strip().lower()).strip("_") def _mask_value_by_kind(value: Any, kind: str) -> str | None: if value is None: return None text = str(value).strip() if not text: return text if "*" in text: return text if kind == "cpf": return _mask_cpf_value(text) if kind == "placa": return _mask_plate_value(text) if kind in {"external_id", "receipt_identifier"}: return _mask_identifier_value(text, suffix=3) return mask_sensitive_text(text) def _mask_cpf_value(value: str) -> str: if "*" in value: return value digits = re.sub(r"\D", "", str(value or "")) if len(digits) >= 2: return f"***.***.***-{digits[-2:]}" return "***.***.***-**" def _mask_plate_value(value: str) -> str: if "*" in value: return value normalized = re.sub(r"[^A-Za-z0-9]", "", str(value or "")).upper() if not normalized: return "***" if len(normalized) <= 4: return "***" hidden_count = max(len(normalized) - 4, 3) return f"{normalized[:3]}{'*' * hidden_count}{normalized[-1:]}" def _mask_identifier_value(value: str, *, suffix: int = 3) -> str: if "*" in value: return value text = str(value or "").strip() if not text: return text if len(text) <= suffix: return "*" * max(len(text), 3) hidden_count = max(len(text) - suffix, 3) return f"{'*' * hidden_count}{text[-suffix:]}"