You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
orquestrador/app/services/orchestration/sensitive_data.py

145 lines
4.4 KiB
Python

import re
from typing import Any
_CPF_PATTERN = re.compile(r"(?<!\d)(\d{3}\.?\d{3}\.?\d{3}-?\d{2})(?!\d)")
_PLATE_PATTERN = re.compile(r"(?<![A-Za-z0-9])([A-Za-z]{3}\d{4}|[A-Za-z]{3}\d[A-Za-z]\d{2})(?![A-Za-z0-9])")
_LABELED_EXTERNAL_ID_PATTERN = re.compile(
r'(?i)(["\']?external_id["\']?\s*[:=]\s*["\']?)([A-Za-z0-9._:-]{4,})'
)
_LABELED_RECEIPT_IDENTIFIER_PATTERN = re.compile(
r'(?i)(["\']?(?:identificador(?:_?do)?_?comprovante|comprovante_id|receipt_id|receipt_identifier|nsu|transaction_id|pix_e2e_id|end_to_end_id)["\']?\s*[:=]\s*["\']?)([A-Za-z0-9._:-]{4,})'
)
_CPF_KEYS = {
"cpf",
"customer_cpf",
"cpf_cliente",
}
_PLATE_KEYS = {
"placa",
"placa_veiculo",
"vehicle_plate",
"plate",
}
_EXTERNAL_ID_KEYS = {
"external_id",
}
_RECEIPT_IDENTIFIER_KEYS = {
"identificador_comprovante",
"comprovante_id",
"receipt_id",
"receipt_identifier",
"nsu",
"transaction_id",
"pix_e2e_id",
"end_to_end_id",
}
def mask_sensitive_text(value: str | None) -> str | None:
if value is None:
return None
text = str(value)
if not text:
return text
masked = _LABELED_EXTERNAL_ID_PATTERN.sub(
lambda match: f"{match.group(1)}{_mask_identifier_value(match.group(2), suffix=3)}",
text,
)
masked = _LABELED_RECEIPT_IDENTIFIER_PATTERN.sub(
lambda match: f"{match.group(1)}{_mask_identifier_value(match.group(2), suffix=3)}",
masked,
)
masked = _CPF_PATTERN.sub(lambda match: _mask_cpf_value(match.group(1)), masked)
masked = _PLATE_PATTERN.sub(lambda match: _mask_plate_value(match.group(1)), masked)
return masked
def mask_sensitive_payload(value: Any, *, key: str | None = None) -> Any:
key_kind = _classify_sensitive_key(key)
if key_kind is not None:
return _mask_value_by_kind(value, key_kind)
if isinstance(value, dict):
return {item_key: mask_sensitive_payload(item_value, key=item_key) for item_key, item_value in value.items()}
if isinstance(value, list):
return [mask_sensitive_payload(item, key=key) for item in value]
if isinstance(value, tuple):
return tuple(mask_sensitive_payload(item, key=key) for item in value)
if isinstance(value, set):
return {mask_sensitive_payload(item, key=key) for item in value}
if isinstance(value, str):
return mask_sensitive_text(value)
return value
def _classify_sensitive_key(key: str | None) -> str | None:
normalized = _normalize_key(key)
if not normalized:
return None
if normalized in _CPF_KEYS or normalized.endswith("_cpf"):
return "cpf"
if normalized in _PLATE_KEYS or normalized.endswith("_placa") or normalized.endswith("_plate"):
return "placa"
if normalized in _EXTERNAL_ID_KEYS:
return "external_id"
if normalized in _RECEIPT_IDENTIFIER_KEYS:
return "receipt_identifier"
return None
def _normalize_key(key: str | None) -> str:
return re.sub(r"[^a-z0-9]+", "_", str(key or "").strip().lower()).strip("_")
def _mask_value_by_kind(value: Any, kind: str) -> str | None:
if value is None:
return None
text = str(value).strip()
if not text:
return text
if "*" in text:
return text
if kind == "cpf":
return _mask_cpf_value(text)
if kind == "placa":
return _mask_plate_value(text)
if kind in {"external_id", "receipt_identifier"}:
return _mask_identifier_value(text, suffix=3)
return mask_sensitive_text(text)
def _mask_cpf_value(value: str) -> str:
if "*" in value:
return value
digits = re.sub(r"\D", "", str(value or ""))
if len(digits) >= 2:
return f"***.***.***-{digits[-2:]}"
return "***.***.***-**"
def _mask_plate_value(value: str) -> str:
if "*" in value:
return value
normalized = re.sub(r"[^A-Za-z0-9]", "", str(value or "")).upper()
if not normalized:
return "***"
if len(normalized) <= 4:
return "***"
hidden_count = max(len(normalized) - 4, 3)
return f"{normalized[:3]}{'*' * hidden_count}{normalized[-1:]}"
def _mask_identifier_value(value: str, *, suffix: int = 3) -> str:
if "*" in value:
return value
text = str(value or "").strip()
if not text:
return text
if len(text) <= suffix:
return "*" * max(len(text), 3)
hidden_count = max(len(text) - suffix, 3)
return f"{'*' * hidden_count}{text[-suffix:]}"