🔒 fix(observability): mascarar dados sensiveis em logs e historico
Centraliza a sanitizacao de cpf, placa, external_id e identificadores de comprovante para evitar que esses dados sejam persistidos crus no historico conversacional e nos logs operacionais. Aplica a mascaracao no trace do orquestrador, na auditoria de turnos e no tratamento de erros do satellite do Telegram, preservando apenas fragmentos uteis para troubleshooting. Amplia a cobertura com testes para persistencia mascarada, leitura de registros legados e payloads de log sanitizados, mantendo a suite completa verde com 209 testes.main
parent
a3525334ad
commit
a5b28182d9
@ -0,0 +1,144 @@
|
||||
import re
|
||||
from typing import Any
|
||||
|
||||
|
||||
_CPF_PATTERN = re.compile(r"(?<!\d)(\d{3}\.?\d{3}\.?\d{3}-?\d{2})(?!\d)")
|
||||
_PLATE_PATTERN = re.compile(r"(?<![A-Za-z0-9])([A-Za-z]{3}\d{4}|[A-Za-z]{3}\d[A-Za-z]\d{2})(?![A-Za-z0-9])")
|
||||
_LABELED_EXTERNAL_ID_PATTERN = re.compile(
|
||||
r'(?i)(["\']?external_id["\']?\s*[:=]\s*["\']?)([A-Za-z0-9._:-]{4,})'
|
||||
)
|
||||
_LABELED_RECEIPT_IDENTIFIER_PATTERN = re.compile(
|
||||
r'(?i)(["\']?(?:identificador(?:_?do)?_?comprovante|comprovante_id|receipt_id|receipt_identifier|nsu|transaction_id|pix_e2e_id|end_to_end_id)["\']?\s*[:=]\s*["\']?)([A-Za-z0-9._:-]{4,})'
|
||||
)
|
||||
|
||||
_CPF_KEYS = {
|
||||
"cpf",
|
||||
"customer_cpf",
|
||||
"cpf_cliente",
|
||||
}
|
||||
_PLATE_KEYS = {
|
||||
"placa",
|
||||
"placa_veiculo",
|
||||
"vehicle_plate",
|
||||
"plate",
|
||||
}
|
||||
_EXTERNAL_ID_KEYS = {
|
||||
"external_id",
|
||||
}
|
||||
_RECEIPT_IDENTIFIER_KEYS = {
|
||||
"identificador_comprovante",
|
||||
"comprovante_id",
|
||||
"receipt_id",
|
||||
"receipt_identifier",
|
||||
"nsu",
|
||||
"transaction_id",
|
||||
"pix_e2e_id",
|
||||
"end_to_end_id",
|
||||
}
|
||||
|
||||
|
||||
def mask_sensitive_text(value: str | None) -> str | None:
|
||||
if value is None:
|
||||
return None
|
||||
text = str(value)
|
||||
if not text:
|
||||
return text
|
||||
|
||||
masked = _LABELED_EXTERNAL_ID_PATTERN.sub(
|
||||
lambda match: f"{match.group(1)}{_mask_identifier_value(match.group(2), suffix=3)}",
|
||||
text,
|
||||
)
|
||||
masked = _LABELED_RECEIPT_IDENTIFIER_PATTERN.sub(
|
||||
lambda match: f"{match.group(1)}{_mask_identifier_value(match.group(2), suffix=3)}",
|
||||
masked,
|
||||
)
|
||||
masked = _CPF_PATTERN.sub(lambda match: _mask_cpf_value(match.group(1)), masked)
|
||||
masked = _PLATE_PATTERN.sub(lambda match: _mask_plate_value(match.group(1)), masked)
|
||||
return masked
|
||||
|
||||
|
||||
def mask_sensitive_payload(value: Any, *, key: str | None = None) -> Any:
|
||||
key_kind = _classify_sensitive_key(key)
|
||||
if key_kind is not None:
|
||||
return _mask_value_by_kind(value, key_kind)
|
||||
|
||||
if isinstance(value, dict):
|
||||
return {item_key: mask_sensitive_payload(item_value, key=item_key) for item_key, item_value in value.items()}
|
||||
if isinstance(value, list):
|
||||
return [mask_sensitive_payload(item, key=key) for item in value]
|
||||
if isinstance(value, tuple):
|
||||
return tuple(mask_sensitive_payload(item, key=key) for item in value)
|
||||
if isinstance(value, set):
|
||||
return {mask_sensitive_payload(item, key=key) for item in value}
|
||||
if isinstance(value, str):
|
||||
return mask_sensitive_text(value)
|
||||
return value
|
||||
|
||||
|
||||
def _classify_sensitive_key(key: str | None) -> str | None:
|
||||
normalized = _normalize_key(key)
|
||||
if not normalized:
|
||||
return None
|
||||
if normalized in _CPF_KEYS or normalized.endswith("_cpf"):
|
||||
return "cpf"
|
||||
if normalized in _PLATE_KEYS or normalized.endswith("_placa") or normalized.endswith("_plate"):
|
||||
return "placa"
|
||||
if normalized in _EXTERNAL_ID_KEYS:
|
||||
return "external_id"
|
||||
if normalized in _RECEIPT_IDENTIFIER_KEYS:
|
||||
return "receipt_identifier"
|
||||
return None
|
||||
|
||||
|
||||
def _normalize_key(key: str | None) -> str:
|
||||
return re.sub(r"[^a-z0-9]+", "_", str(key or "").strip().lower()).strip("_")
|
||||
|
||||
|
||||
def _mask_value_by_kind(value: Any, kind: str) -> str | None:
|
||||
if value is None:
|
||||
return None
|
||||
text = str(value).strip()
|
||||
if not text:
|
||||
return text
|
||||
if "*" in text:
|
||||
return text
|
||||
if kind == "cpf":
|
||||
return _mask_cpf_value(text)
|
||||
if kind == "placa":
|
||||
return _mask_plate_value(text)
|
||||
if kind in {"external_id", "receipt_identifier"}:
|
||||
return _mask_identifier_value(text, suffix=3)
|
||||
return mask_sensitive_text(text)
|
||||
|
||||
|
||||
def _mask_cpf_value(value: str) -> str:
|
||||
if "*" in value:
|
||||
return value
|
||||
digits = re.sub(r"\D", "", str(value or ""))
|
||||
if len(digits) >= 2:
|
||||
return f"***.***.***-{digits[-2:]}"
|
||||
return "***.***.***-**"
|
||||
|
||||
|
||||
def _mask_plate_value(value: str) -> str:
|
||||
if "*" in value:
|
||||
return value
|
||||
normalized = re.sub(r"[^A-Za-z0-9]", "", str(value or "")).upper()
|
||||
if not normalized:
|
||||
return "***"
|
||||
if len(normalized) <= 4:
|
||||
return "***"
|
||||
hidden_count = max(len(normalized) - 4, 3)
|
||||
return f"{normalized[:3]}{'*' * hidden_count}{normalized[-1:]}"
|
||||
|
||||
|
||||
def _mask_identifier_value(value: str, *, suffix: int = 3) -> str:
|
||||
if "*" in value:
|
||||
return value
|
||||
text = str(value or "").strip()
|
||||
if not text:
|
||||
return text
|
||||
if len(text) <= suffix:
|
||||
return "*" * max(len(text), 3)
|
||||
hidden_count = max(len(text) - suffix, 3)
|
||||
return f"{'*' * hidden_count}{text[-suffix:]}"
|
||||
Loading…
Reference in New Issue