diff --git a/app/integrations/telegram_satellite_service.py b/app/integrations/telegram_satellite_service.py index bb7da21..f3b14d0 100644 --- a/app/integrations/telegram_satellite_service.py +++ b/app/integrations/telegram_satellite_service.py @@ -15,6 +15,7 @@ from app.services.ai.llm_service import ( LLMService, ) from app.services.orchestration.orquestrador_service import OrquestradorService +from app.services.orchestration.sensitive_data import mask_sensitive_payload from app.services.user.user_service import UserService @@ -322,7 +323,7 @@ class TelegramSatelliteService: image_attachments=image_attachments, ) except HTTPException as exc: - logger.warning("Falha de dominio ao processar mensagem no Telegram: %s", exc.detail) + logger.warning("Falha de dominio ao processar mensagem no Telegram: %s", mask_sensitive_payload(exc.detail)) answer = str(exc.detail) if exc.detail else "Nao foi possivel concluir a operacao solicitada." except Exception: logger.exception("Erro ao processar mensagem do Telegram.") @@ -516,3 +517,4 @@ async def main() -> None: if __name__ == "__main__": asyncio.run(main()) + diff --git a/app/services/orchestration/conversation_history_service.py b/app/services/orchestration/conversation_history_service.py index 64c8427..bbce6ca 100644 --- a/app/services/orchestration/conversation_history_service.py +++ b/app/services/orchestration/conversation_history_service.py @@ -5,6 +5,7 @@ from typing import Any from app.db.mock_database import SessionMockLocal from app.db.mock_models import ConversationTurn, User +from app.services.orchestration.sensitive_data import mask_sensitive_payload, mask_sensitive_text logger = logging.getLogger(__name__) @@ -49,17 +50,17 @@ class ConversationHistoryService: "conversation_id": str(conversation_id or "anonymous"), "user_id": user_id, "channel": channel, - "external_id": external_id, + "external_id": mask_sensitive_payload(external_id, key="external_id"), "username": username, - "user_message": str(user_message or ""), - "assistant_response": assistant_response, + "user_message": mask_sensitive_text(str(user_message or "")), + "assistant_response": mask_sensitive_text(assistant_response), "turn_status": str(turn_status or "completed"), "intent": self._clean_text(intent), "domain": self._clean_text(domain), "action": self._clean_text(action), "tool_name": self._clean_text(tool_name), - "tool_arguments": self._serialize_json(tool_arguments), - "error_detail": error_detail, + "tool_arguments": self._serialize_json(mask_sensitive_payload(tool_arguments)), + "error_detail": mask_sensitive_text(error_detail), } if started_at is not None: payload["started_at"] = started_at @@ -133,17 +134,17 @@ class ConversationHistoryService: "conversation_id": row.conversation_id, "user_id": row.user_id, "channel": row.channel, - "external_id": row.external_id, + "external_id": mask_sensitive_payload(row.external_id, key="external_id"), "username": row.username, - "user_message": row.user_message, - "assistant_response": row.assistant_response, + "user_message": mask_sensitive_text(row.user_message), + "assistant_response": mask_sensitive_text(row.assistant_response), "turn_status": row.turn_status, "intent": row.intent, "domain": row.domain, "action": row.action, "tool_name": row.tool_name, - "tool_arguments": self._deserialize_json(row.tool_arguments), - "error_detail": row.error_detail, + "tool_arguments": mask_sensitive_payload(self._deserialize_json(row.tool_arguments)), + "error_detail": mask_sensitive_text(row.error_detail), "elapsed_ms": row.elapsed_ms, "started_at": row.started_at.isoformat() if row.started_at else None, "completed_at": row.completed_at.isoformat() if row.completed_at else None, diff --git a/app/services/orchestration/orquestrador_service.py b/app/services/orchestration/orquestrador_service.py index 4eb53ca..c04d0af 100644 --- a/app/services/orchestration/orquestrador_service.py +++ b/app/services/orchestration/orquestrador_service.py @@ -21,6 +21,7 @@ from app.services.orchestration.conversation_state_repository import Conversatio from app.services.orchestration.entity_normalizer import EntityNormalizer from app.services.orchestration.message_planner import MessagePlanner from app.services.orchestration.conversation_history_service import ConversationHistoryService +from app.services.orchestration.sensitive_data import mask_sensitive_payload, mask_sensitive_text from app.services.orchestration.state_repository_factory import get_conversation_state_repository from app.services.flows.order_flow import OrderFlowMixin from app.services.flows.rental_flow import RentalFlowMixin @@ -2582,7 +2583,7 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin, RentalFlowMixin): if not isinstance(trace, dict): return trace["tool_name"] = str(tool_name or "").strip() or None - trace["tool_arguments"] = dict(arguments or {}) if isinstance(arguments, dict) else None + trace["tool_arguments"] = mask_sensitive_payload(dict(arguments or {})) if isinstance(arguments, dict) else None def _finalize_turn_history( self, @@ -2619,20 +2620,23 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin, RentalFlowMixin): if isinstance(exc, HTTPException): detail = exc.detail if isinstance(detail, dict): - return json.dumps(detail, ensure_ascii=True, separators=(",", ":"), default=str) - return str(detail) - return f"{type(exc).__name__}: {exc}" + return json.dumps(mask_sensitive_payload(detail), ensure_ascii=True, separators=(",", ":"), default=str) + return str(mask_sensitive_text(str(detail))) + return str(mask_sensitive_text(f"{type(exc).__name__}: {exc}")) def _log_turn_event(self, event: str, **payload) -> None: trace = getattr(self, "_turn_trace", {}) or {} - logger.info( - "turn_event=%s payload=%s", - event, + safe_payload = mask_sensitive_payload( { "request_id": trace.get("request_id"), "conversation_id": trace.get("conversation_id"), **payload, - }, + } + ) + logger.info( + "turn_event=%s payload=%s", + event, + safe_payload, ) async def _call_llm_with_trace(self, operation: str, message: str, tools): @@ -2788,3 +2792,5 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin, RentalFlowMixin): def _fallback_format_tool_result(self, tool_name: str, tool_result) -> str: return self.tool_executor.fallback_format_tool_result(tool_name=tool_name, tool_result=tool_result) + + diff --git a/app/services/orchestration/sensitive_data.py b/app/services/orchestration/sensitive_data.py new file mode 100644 index 0000000..b50e30e --- /dev/null +++ b/app/services/orchestration/sensitive_data.py @@ -0,0 +1,144 @@ +import re +from typing import Any + + +_CPF_PATTERN = re.compile(r"(? str | None: + if value is None: + return None + text = str(value) + if not text: + return text + + masked = _LABELED_EXTERNAL_ID_PATTERN.sub( + lambda match: f"{match.group(1)}{_mask_identifier_value(match.group(2), suffix=3)}", + text, + ) + masked = _LABELED_RECEIPT_IDENTIFIER_PATTERN.sub( + lambda match: f"{match.group(1)}{_mask_identifier_value(match.group(2), suffix=3)}", + masked, + ) + masked = _CPF_PATTERN.sub(lambda match: _mask_cpf_value(match.group(1)), masked) + masked = _PLATE_PATTERN.sub(lambda match: _mask_plate_value(match.group(1)), masked) + return masked + + +def mask_sensitive_payload(value: Any, *, key: str | None = None) -> Any: + key_kind = _classify_sensitive_key(key) + if key_kind is not None: + return _mask_value_by_kind(value, key_kind) + + if isinstance(value, dict): + return {item_key: mask_sensitive_payload(item_value, key=item_key) for item_key, item_value in value.items()} + if isinstance(value, list): + return [mask_sensitive_payload(item, key=key) for item in value] + if isinstance(value, tuple): + return tuple(mask_sensitive_payload(item, key=key) for item in value) + if isinstance(value, set): + return {mask_sensitive_payload(item, key=key) for item in value} + if isinstance(value, str): + return mask_sensitive_text(value) + return value + + +def _classify_sensitive_key(key: str | None) -> str | None: + normalized = _normalize_key(key) + if not normalized: + return None + if normalized in _CPF_KEYS or normalized.endswith("_cpf"): + return "cpf" + if normalized in _PLATE_KEYS or normalized.endswith("_placa") or normalized.endswith("_plate"): + return "placa" + if normalized in _EXTERNAL_ID_KEYS: + return "external_id" + if normalized in _RECEIPT_IDENTIFIER_KEYS: + return "receipt_identifier" + return None + + +def _normalize_key(key: str | None) -> str: + return re.sub(r"[^a-z0-9]+", "_", str(key or "").strip().lower()).strip("_") + + +def _mask_value_by_kind(value: Any, kind: str) -> str | None: + if value is None: + return None + text = str(value).strip() + if not text: + return text + if "*" in text: + return text + if kind == "cpf": + return _mask_cpf_value(text) + if kind == "placa": + return _mask_plate_value(text) + if kind in {"external_id", "receipt_identifier"}: + return _mask_identifier_value(text, suffix=3) + return mask_sensitive_text(text) + + +def _mask_cpf_value(value: str) -> str: + if "*" in value: + return value + digits = re.sub(r"\D", "", str(value or "")) + if len(digits) >= 2: + return f"***.***.***-{digits[-2:]}" + return "***.***.***-**" + + +def _mask_plate_value(value: str) -> str: + if "*" in value: + return value + normalized = re.sub(r"[^A-Za-z0-9]", "", str(value or "")).upper() + if not normalized: + return "***" + if len(normalized) <= 4: + return "***" + hidden_count = max(len(normalized) - 4, 3) + return f"{normalized[:3]}{'*' * hidden_count}{normalized[-1:]}" + + +def _mask_identifier_value(value: str, *, suffix: int = 3) -> str: + if "*" in value: + return value + text = str(value or "").strip() + if not text: + return text + if len(text) <= suffix: + return "*" * max(len(text), 3) + hidden_count = max(len(text) - suffix, 3) + return f"{'*' * hidden_count}{text[-suffix:]}" diff --git a/tests/test_conversation_history_service.py b/tests/test_conversation_history_service.py index 4d20d50..7cf7e8a 100644 --- a/tests/test_conversation_history_service.py +++ b/tests/test_conversation_history_service.py @@ -90,7 +90,7 @@ class ConversationHistoryServiceTests(unittest.TestCase): self.assertEqual(record.conversation_id, "user:7") self.assertEqual(record.user_id, 7) self.assertEqual(record.channel, "telegram") - self.assertEqual(record.external_id, "12345") + self.assertEqual(record.external_id, "***345") self.assertEqual(record.username, "cliente_teste") self.assertEqual(record.intent, "order_create") self.assertEqual(record.domain, "sales") @@ -101,6 +101,64 @@ class ConversationHistoryServiceTests(unittest.TestCase): self.assertEqual(record.completed_at, completed_at) self.assertEqual(record.elapsed_ms, 512.4) + def test_record_turn_masks_sensitive_fields_before_persisting(self): + session = _FakeSession( + user=SimpleNamespace( + id=7, + channel="telegram", + external_id="987654321", + username="cliente_teste", + ) + ) + service = ConversationHistoryService() + + with patch( + "app.services.orchestration.conversation_history_service.SessionMockLocal", + return_value=session, + ): + service.record_turn( + request_id="req-sensitive", + conversation_id="user:7", + user_id=7, + user_message="Meu cpf 123.456.789-09 e a placa ABC1D23.", + assistant_response="Recebi o identificador_comprovante=NSU123 para a placa ABC1D23.", + turn_status="failed", + intent="rental_payment", + domain="rental", + action="call_tool", + tool_name="registrar_pagamento_aluguel", + tool_arguments={ + "cpf": "12345678909", + "placa": "ABC1D23", + "external_id": "987654321", + "identificador_comprovante": "NSU123", + "nested": { + "placa": "ABC1D23", + }, + }, + error_detail='{"external_id":"987654321","placa":"ABC1D23","identificador_comprovante":"NSU123"}', + ) + + record = session.added[0] + self.assertNotIn("123.456.789-09", record.user_message) + self.assertNotIn("ABC1D23", record.user_message) + self.assertIn("***.***.***-09", record.user_message) + self.assertIn("ABC***3", record.user_message) + self.assertNotIn("NSU123", record.assistant_response) + self.assertIn("***123", record.assistant_response) + self.assertEqual(record.external_id, "******321") + self.assertNotIn("12345678909", record.tool_arguments) + self.assertNotIn("ABC1D23", record.tool_arguments) + self.assertNotIn("987654321", record.tool_arguments) + self.assertNotIn("NSU123", record.tool_arguments) + self.assertIn("***.***.***-09", record.tool_arguments) + self.assertIn("ABC***3", record.tool_arguments) + self.assertIn("******321", record.tool_arguments) + self.assertIn("***123", record.tool_arguments) + self.assertNotIn("987654321", record.error_detail) + self.assertNotIn("ABC1D23", record.error_detail) + self.assertNotIn("NSU123", record.error_detail) + def test_list_turns_filters_and_orders_recent_first(self): engine = create_engine("sqlite:///:memory:") SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) @@ -191,6 +249,56 @@ class ConversationHistoryServiceTests(unittest.TestCase): self.assertEqual(items[0]["tool_arguments"], {"vehicle_id": 1}) self.assertEqual(items[0]["turn_status"], "completed") self.assertEqual(items[0]["user_id"], user_id) + self.assertEqual(items[0]["external_id"], "***") + + def test_list_turns_masks_legacy_sensitive_fields_on_read(self): + engine = create_engine("sqlite:///:memory:") + SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) + MockBase.metadata.create_all(bind=engine) + self.addCleanup(engine.dispose) + + db = SessionLocal() + db.add( + ConversationTurn( + request_id="req-sensitive", + conversation_id="user:42", + user_id=42, + channel="telegram", + external_id="987654321", + username="cliente", + user_message="Cpf 12345678909 e placa ABC1234.", + assistant_response="identificador_comprovante=NSU123 recebido para ABC1234", + turn_status="completed", + tool_name="registrar_pagamento_aluguel", + tool_arguments='{"cpf":"12345678909","placa":"ABC1234","external_id":"987654321","identificador_comprovante":"NSU123"}', + error_detail='{"placa":"ABC1234","external_id":"987654321","identificador_comprovante":"NSU123"}', + started_at=datetime(2026, 3, 16, 13, 0, 0), + completed_at=datetime(2026, 3, 16, 13, 0, 1), + ) + ) + db.commit() + db.close() + + service = ConversationHistoryService() + with patch( + "app.services.orchestration.conversation_history_service.SessionMockLocal", + SessionLocal, + ): + items = service.list_turns(request_id="req-sensitive", limit=5) + + self.assertEqual(len(items), 1) + item = items[0] + self.assertEqual(item["external_id"], "******321") + self.assertNotIn("12345678909", item["user_message"]) + self.assertNotIn("ABC1234", item["user_message"]) + self.assertNotIn("NSU123", item["assistant_response"]) + self.assertEqual(item["tool_arguments"]["cpf"], "***.***.***-09") + self.assertEqual(item["tool_arguments"]["placa"], "ABC***4") + self.assertEqual(item["tool_arguments"]["external_id"], "******321") + self.assertEqual(item["tool_arguments"]["identificador_comprovante"], "***123") + self.assertNotIn("ABC1234", item["error_detail"]) + self.assertNotIn("987654321", item["error_detail"]) + self.assertNotIn("NSU123", item["error_detail"]) def test_list_turns_can_filter_by_request_id(self): engine = create_engine("sqlite:///:memory:") diff --git a/tests/test_telegram_multimodal.py b/tests/test_telegram_multimodal.py index c4bfdad..ae4b636 100644 --- a/tests/test_telegram_multimodal.py +++ b/tests/test_telegram_multimodal.py @@ -3,6 +3,8 @@ import asyncio from types import SimpleNamespace from unittest.mock import AsyncMock, patch +from fastapi import HTTPException + from app.integrations.telegram_satellite_service import TelegramSatelliteService @@ -107,6 +109,48 @@ class TelegramMultimodalTests(unittest.IsolatedAsyncioTestCase): self.assertIn("marca d'agua SysaltiIA visivel", answer) self.assertFalse(orchestrator_cls.return_value.handle_message.await_count) + async def test_handle_update_masks_sensitive_domain_error_in_logs(self): + service = TelegramSatelliteService("token-teste") + self._service_under_test = service + update = { + "update_id": 1, + "message": { + "chat": {"id": 99}, + "from": {"id": 99}, + "text": "segue o pagamento", + }, + } + + with patch.object(service, "_extract_image_attachments", AsyncMock(return_value=[])), patch.object( + service, + "_process_message", + AsyncMock( + side_effect=HTTPException( + status_code=409, + detail={ + "cpf": "12345678909", + "placa": "ABC1D23", + "external_id": "987654321", + "identificador_comprovante": "NSU123", + }, + ) + ), + ), patch.object(service, "_send_message", AsyncMock()), patch( + "app.integrations.telegram_satellite_service.logger.warning" + ) as logger_warning: + await service._handle_update(session=SimpleNamespace(), update=update) + + self.assertTrue(logger_warning.called) + logged_detail = str(logger_warning.call_args.args[1]) + self.assertNotIn("12345678909", logged_detail) + self.assertNotIn("ABC1D23", logged_detail) + self.assertNotIn("987654321", logged_detail) + self.assertNotIn("NSU123", logged_detail) + self.assertIn("***.***.***-09", logged_detail) + self.assertIn("ABC***3", logged_detail) + self.assertIn("******321", logged_detail) + self.assertIn("***123", logged_detail) + async def test_schedule_update_processing_allows_parallel_chats(self): service = TelegramSatelliteService("token-teste") self._service_under_test = service @@ -171,3 +215,4 @@ class TelegramMultimodalTests(unittest.IsolatedAsyncioTestCase): await asyncio.wait_for(second_started.wait(), timeout=1) self.assertEqual(started_updates, [1, 2]) + diff --git a/tests/test_turn_decision_contract.py b/tests/test_turn_decision_contract.py index 3a87e8f..f49b478 100644 --- a/tests/test_turn_decision_contract.py +++ b/tests/test_turn_decision_contract.py @@ -1,6 +1,7 @@ import os import unittest from types import SimpleNamespace +from unittest.mock import patch os.environ.setdefault("DEBUG", "false") @@ -1354,6 +1355,41 @@ class TurnDecisionContractTests(unittest.IsolatedAsyncioTestCase): self.assertIn("RuntimeError", history_calls[0]["error_detail"]) self.assertIn("falha controlada no turno", history_calls[0]["error_detail"]) + def test_log_turn_event_masks_sensitive_payload(self): + service = OrquestradorService.__new__(OrquestradorService) + service._turn_trace = { + "request_id": "req-1", + "conversation_id": "user:7", + } + + with patch("app.services.orchestration.orquestrador_service.logger.info") as logger_info: + service._log_turn_event( + "tool_completed", + message="Meu cpf 12345678909 e a placa ABC1D23.", + arguments={ + "cpf": "12345678909", + "placa": "ABC1D23", + "external_id": "987654321", + "identificador_comprovante": "NSU123", + }, + result={ + "placa": "ABC1D23", + "identificador_comprovante": "NSU123", + }, + ) + + self.assertTrue(logger_info.called) + payload = logger_info.call_args.args[2] + payload_text = str(payload) + self.assertNotIn("12345678909", payload_text) + self.assertNotIn("ABC1D23", payload_text) + self.assertNotIn("987654321", payload_text) + self.assertNotIn("NSU123", payload_text) + self.assertIn("***.***.***-09", payload_text) + self.assertIn("ABC***3", payload_text) + self.assertIn("******321", payload_text) + self.assertIn("***123", payload_text) + async def test_handle_message_prioritizes_order_flow_over_model_answer_for_purchase_intent(self): state = FakeState( contexts={ @@ -4107,3 +4143,5 @@ class TurnDecisionContractTests(unittest.IsolatedAsyncioTestCase): if __name__ == "__main__": unittest.main() + +