From 31d02a7daad47ced44645271c8fe020455fb0c0a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vitor=20Hugo=20Belorio=20Sim=C3=A3o?= Date: Tue, 24 Mar 2026 15:32:03 -0300 Subject: [PATCH] =?UTF-8?q?=F0=9F=93=A8=20feat(integrations):=20escalar=20?= =?UTF-8?q?entregas=20do=20Brevo=20com=20destinatario=20dinamico?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Consolidar as rotas de email por evento em um modelo global dinamico, resolvendo o destinatario a partir do cadastro do usuario e registrando recipient_email e recipient_name em cada entrega do outbox para melhorar rastreabilidade e operacao. Permitir captura opcional de email no Telegram, salvar o endereco no cadastro do usuario e reaproveitar esse dado em revisao, pedido e aluguel, incluindo prompts de consentimento e reenvio imediato do resumo apos a confirmacao. Ampliar a configuracao do provider Brevo e dos scripts operacionais com sender por rota, reply-to, cc, bcc, tags, headers, listagem de rotas e entregas, alem de migracoes de bootstrap e cobertura automatizada validada com 100 testes OK. --- .env.example | 13 + README.md | 32 ++ app/db/bootstrap.py | 25 ++ app/db/mock_models.py | 3 + app/repositories/user_repository.py | 21 ++ app/services/domain/rental_service.py | 5 +- app/services/flows/order_flow.py | 17 + app/services/flows/rental_flow.py | 13 + app/services/flows/review_flow.py | 7 + app/services/integrations/__init__.py | 8 +- app/services/integrations/providers.py | 128 ++++++- app/services/integrations/service.py | 309 +++++++++++++++- .../orchestration/conversation_state_store.py | 1 + .../orchestration/orquestrador_service.py | 245 ++++++++++-- scripts/list_integration_deliveries.py | 34 ++ scripts/list_integration_routes.py | 38 ++ scripts/process_integration_deliveries.py | 46 ++- scripts/upsert_integration_route.py | 94 +++++ tests/test_brevo_provider.py | 100 +++++ tests/test_conversation_state_store.py | 18 +- tests/test_integration_service.py | 350 +++++++++++++++++- tests/test_turn_decision_contract.py | 223 ++++++++++- 22 files changed, 1667 insertions(+), 63 deletions(-) create mode 100644 scripts/list_integration_deliveries.py create mode 100644 scripts/list_integration_routes.py create mode 100644 tests/test_brevo_provider.py diff --git a/.env.example b/.env.example index ebae699..eb830ff 100644 --- a/.env.example +++ b/.env.example @@ -62,3 +62,16 @@ CONVERSATION_STATE_TTL_MINUTES=60 REDIS_URL=redis://127.0.0.1:6379/0 REDIS_KEY_PREFIX=orquestrador REDIS_SOCKET_TIMEOUT_SECONDS=5 + +# ============================================ +# INTEGRACOES EXTERNAS +# ============================================ + +INTEGRATIONS_ENABLED=false +INTEGRATION_SYNC_DELIVERY_ENABLED=true + +BREVO_API_KEY= +BREVO_BASE_URL=https://api.brevo.com/v3 +BREVO_SENDER_EMAIL= +BREVO_SENDER_NAME=Orquestrador +BREVO_REQUEST_TIMEOUT_SECONDS=10 diff --git a/README.md b/README.md index 3ebe62d..9287c35 100644 --- a/README.md +++ b/README.md @@ -152,6 +152,10 @@ app/ mock_customer_service.py user_service.py scripts/ + list_integration_deliveries.py + list_integration_routes.py + process_integration_deliveries.py + upsert_integration_route.py stress_smoke.py tests/ ... @@ -305,6 +309,16 @@ Principais variaveis: - `REDIS_KEY_PREFIX` - `REDIS_SOCKET_TIMEOUT_SECONDS` +### Integracoes externas + +- `INTEGRATIONS_ENABLED` +- `INTEGRATION_SYNC_DELIVERY_ENABLED` +- `BREVO_API_KEY` +- `BREVO_BASE_URL` +- `BREVO_SENDER_EMAIL` +- `BREVO_SENDER_NAME` +- `BREVO_REQUEST_TIMEOUT_SECONDS` + ### Ambiente - `ENVIRONMENT` @@ -357,6 +371,24 @@ Se voce estiver usando um arquivo de ambiente dedicado: python -m dotenv -f .env.local run -- python scripts/stress_smoke.py --backend memory --state-iterations 200 --order-cycles 30 --race-attempts 8 --user-base 995000 --cpf 11144477735 ``` +Operacao basica do outbox de integracoes: + +```bash +python scripts/upsert_integration_route.py --event order.created --recipient ops@empresa.com +python scripts/list_integration_routes.py --enabled +python scripts/list_integration_deliveries.py --status failed --limit 20 +python scripts/process_integration_deliveries.py --status failed --limit 20 +``` + +Exemplo de configuracao mais completa da rota Brevo: + +```bash +python scripts/upsert_integration_route.py --event order.created --recipient ops@empresa.com --sender-email noreply@empresa.com --sender-name Operacoes --reply-to-email atendimento@empresa.com --reply-to-name Atendimento --cc financeiro@empresa.com --tag pedidos --tag operacao --header X-Canal=orquestrador +``` + +Campos aceitos no `provider_config` da rota: `sender`, `reply_to`, `cc`, `bcc`, `tags`, `headers` e `html_content`. +Para casos mais avancados, o script tambem aceita `--provider-config-json` com um objeto JSON. + ## Deploy O deploy de servidor fica documentado em [DEPLOY_SERVIDOR.md](DEPLOY_SERVIDOR.md). diff --git a/app/db/bootstrap.py b/app/db/bootstrap.py index 14785e1..d4dad3a 100644 --- a/app/db/bootstrap.py +++ b/app/db/bootstrap.py @@ -3,6 +3,8 @@ Rotina dedicada de bootstrap de banco de dados. Cria tabelas e executa seed inicial de forma explicita, fora do startup do app. """ +from sqlalchemy import inspect, text + from app.core.settings import settings from app.db.database import Base, engine from app.db.mock_database import MockBase, mock_engine @@ -24,6 +26,28 @@ from app.db.mock_seed import seed_mock_data from app.db.tool_seed import seed_tools +def _ensure_mock_schema_evolution() -> None: + inspector = inspect(mock_engine) + table_names = set(inspector.get_table_names()) + if "users" in table_names: + user_columns = {column["name"] for column in inspector.get_columns("users")} + if "email" not in user_columns: + with mock_engine.begin() as connection: + connection.execute(text("ALTER TABLE users ADD COLUMN email VARCHAR(255)")) + + if "integration_deliveries" in table_names: + delivery_columns = {column["name"] for column in inspector.get_columns("integration_deliveries")} + statements: list[str] = [] + if "recipient_email" not in delivery_columns: + statements.append("ALTER TABLE integration_deliveries ADD COLUMN recipient_email VARCHAR(255)") + if "recipient_name" not in delivery_columns: + statements.append("ALTER TABLE integration_deliveries ADD COLUMN recipient_name VARCHAR(120)") + if statements: + with mock_engine.begin() as connection: + for statement in statements: + connection.execute(text(statement)) + + def bootstrap_databases( *, run_tools_seed: bool | None = None, @@ -56,6 +80,7 @@ def bootstrap_databases( try: print("Criando tabelas MySQL (dados ficticios)...") MockBase.metadata.create_all(bind=mock_engine) + _ensure_mock_schema_evolution() if should_seed_mock: print("Populando dados ficticios iniciais...") seed_mock_data() diff --git a/app/db/mock_models.py b/app/db/mock_models.py index 7fc6831..05b4853 100644 --- a/app/db/mock_models.py +++ b/app/db/mock_models.py @@ -40,6 +40,7 @@ class User(MockBase): username = Column(String(120), nullable=True) cpf = Column(String(11), ForeignKey("customers.cpf"), nullable=True, index=True) phone = Column(String(30), nullable=True) + email = Column(String(255), nullable=True, index=True) created_at = Column(DateTime, server_default=func.current_timestamp()) updated_at = Column( DateTime, @@ -194,6 +195,8 @@ class IntegrationDelivery(MockBase): provider = Column(String(40), nullable=False, index=True) status = Column(String(20), nullable=False, default="pending", index=True) payload_json = Column(Text, nullable=False) + recipient_email = Column(String(255), nullable=True, index=True) + recipient_name = Column(String(120), nullable=True) rendered_subject = Column(Text, nullable=True) rendered_body = Column(Text, nullable=True) provider_message_id = Column(String(120), nullable=True, index=True) diff --git a/app/repositories/user_repository.py b/app/repositories/user_repository.py index a7ea49a..d9ee532 100644 --- a/app/repositories/user_repository.py +++ b/app/repositories/user_repository.py @@ -19,6 +19,12 @@ class UserRepository: .first() ) + def get_by_id(self, user_id: int | None): + """Busca usuario pelo identificador interno.""" + if user_id is None: + return None + return self.db.query(User).filter(User.id == user_id).first() + def create( self, channel: str, @@ -58,3 +64,18 @@ class UserRepository: self.db.refresh(user) return user + + def update_email( + self, + user: User, + email: str | None, + ): + """Atualiza email persistido do usuario.""" + normalized_email = str(email or "").strip().lower() or None + if normalized_email == user.email: + return user + + user.email = normalized_email + self.db.commit() + self.db.refresh(user) + return user diff --git a/app/services/domain/rental_service.py b/app/services/domain/rental_service.py index 1562dcd..1c5cfe7 100644 --- a/app/services/domain/rental_service.py +++ b/app/services/domain/rental_service.py @@ -37,7 +37,7 @@ def _parse_optional_datetime(value: str | None, *, field_name: str) -> datetime if not text: return None - normalized = re.sub(r"\s+(?:as|às)\s+", " ", text, flags=re.IGNORECASE) + normalized = re.sub(r"\s+(?:as|às)\s+", " ", text, flags=re.IGNORECASE) for candidate in (text, normalized): try: return datetime.fromisoformat(candidate.replace("Z", "+00:00")) @@ -408,6 +408,7 @@ async def abrir_locacao_aluguel( "status_veiculo": vehicle.status, "cpf": contract.cpf, "nome_cliente": _normalize_text_field(nome_cliente), + "user_id": contract.user_id, } await publish_business_event_safely(RENTAL_OPENED_EVENT, result) return result @@ -475,6 +476,7 @@ async def registrar_devolucao_aluguel( "valor_final": float(contract.valor_final) if contract.valor_final is not None else None, "status": contract.status, "status_veiculo": vehicle.status if vehicle is not None else None, + "user_id": contract.user_id, } await publish_business_event_safely(RENTAL_RETURN_REGISTERED_EVENT, result) return result @@ -549,6 +551,7 @@ async def registrar_pagamento_aluguel( "favorecido": record.favorecido, "identificador_comprovante": record.identificador_comprovante, "status": "registrado", + "user_id": record.user_id, } await publish_business_event_safely(RENTAL_PAYMENT_REGISTERED_EVENT, result) return result diff --git a/app/services/flows/order_flow.py b/app/services/flows/order_flow.py index 237ae35..ecd07cd 100644 --- a/app/services/flows/order_flow.py +++ b/app/services/flows/order_flow.py @@ -970,6 +970,16 @@ class OrderFlowMixin: active_task="order_create", ) self._reset_order_stock_context(user_id=user_id) + if hasattr(self, "_capture_successful_tool_side_effects"): + self._capture_successful_tool_side_effects( + tool_name="realizar_pedido", + arguments={ + "cpf": draft["payload"]["cpf"], + "vehicle_id": draft["payload"]["vehicle_id"], + }, + tool_result=tool_result, + user_id=user_id, + ) return self._fallback_format_tool_result("realizar_pedido", tool_result) @@ -1090,6 +1100,13 @@ class OrderFlowMixin: "order_cancel", active_task="order_cancel", ) + if hasattr(self, "_capture_successful_tool_side_effects"): + self._capture_successful_tool_side_effects( + tool_name="cancelar_pedido", + arguments=draft["payload"], + tool_result=tool_result, + user_id=user_id, + ) return self._fallback_format_tool_result("cancelar_pedido", tool_result) diff --git a/app/services/flows/rental_flow.py b/app/services/flows/rental_flow.py index 4fa0122..ea92363 100644 --- a/app/services/flows/rental_flow.py +++ b/app/services/flows/rental_flow.py @@ -612,5 +612,18 @@ class RentalFlowMixin: self._store_last_rental_contract(user_id=user_id, payload=tool_result) self._reset_pending_rental_states(user_id=user_id) + if hasattr(self, "_capture_successful_tool_side_effects"): + self._capture_successful_tool_side_effects( + tool_name="abrir_locacao_aluguel", + arguments={ + "rental_vehicle_id": draft_payload["rental_vehicle_id"], + "placa": draft_payload.get("placa"), + "data_inicio": draft_payload["data_inicio"], + "data_fim_prevista": draft_payload["data_fim_prevista"], + "cpf": draft_payload.get("cpf"), + }, + tool_result=tool_result, + user_id=user_id, + ) return self._fallback_format_tool_result("abrir_locacao_aluguel", tool_result) diff --git a/app/services/flows/review_flow.py b/app/services/flows/review_flow.py index 5f876f6..f02f542 100644 --- a/app/services/flows/review_flow.py +++ b/app/services/flows/review_flow.py @@ -899,5 +899,12 @@ class ReviewFlowMixin: ) self._store_last_review_package(user_id=user_id, payload=draft["payload"]) self._log_review_flow_source(source=review_flow_source or "draft", payload=draft["payload"]) + if hasattr(self, "_capture_successful_tool_side_effects"): + self._capture_successful_tool_side_effects( + tool_name="agendar_revisao", + arguments=draft["payload"], + tool_result=tool_result, + user_id=user_id, + ) return self._fallback_format_tool_result("agendar_revisao", tool_result) diff --git a/app/services/integrations/__init__.py b/app/services/integrations/__init__.py index ad1530a..42f9da1 100644 --- a/app/services/integrations/__init__.py +++ b/app/services/integrations/__init__.py @@ -1,4 +1,4 @@ -from app.services.integrations.events import ( +from app.services.integrations.events import ( ORDER_CANCELLED_EVENT, ORDER_CREATED_EVENT, RENTAL_OPENED_EVENT, @@ -8,10 +8,13 @@ from app.services.integrations.events import ( SUPPORTED_EVENT_TYPES, ) from app.services.integrations.service import ( + SUPPORTED_DELIVERY_STATUSES, emit_business_event, + list_integration_deliveries, list_integration_routes, process_pending_deliveries, publish_business_event_safely, + sync_user_email_integration_routes, upsert_email_integration_route, ) @@ -23,9 +26,12 @@ __all__ = [ "RENTAL_RETURN_REGISTERED_EVENT", "REVIEW_SCHEDULED_EVENT", "SUPPORTED_EVENT_TYPES", + "SUPPORTED_DELIVERY_STATUSES", "emit_business_event", + "list_integration_deliveries", "list_integration_routes", "process_pending_deliveries", "publish_business_event_safely", + "sync_user_email_integration_routes", "upsert_email_integration_route", ] diff --git a/app/services/integrations/providers.py b/app/services/integrations/providers.py index 8dd0f51..8c07491 100644 --- a/app/services/integrations/providers.py +++ b/app/services/integrations/providers.py @@ -1,3 +1,6 @@ +from collections.abc import Mapping +from typing import Any + import httpx from app.core.settings import settings @@ -7,6 +10,67 @@ class IntegrationProviderError(RuntimeError): """Erro de transporte ou configuracao em providers externos.""" +def _clean_text(value: Any) -> str | None: + text = str(value or "").strip() + return text or None + + +def _normalize_address(value: Any, *, fallback_name: Any = None) -> dict[str, str] | None: + if isinstance(value, Mapping): + email = _clean_text(value.get("email")) + name = _clean_text(value.get("name")) + else: + email = _clean_text(value) + name = _clean_text(fallback_name) + + if not email: + return None + + address = {"email": email} + if name: + address["name"] = name + return address + + +def _normalize_address_list(value: Any) -> list[dict[str, str]]: + if value is None: + return [] + + candidates = value if isinstance(value, (list, tuple, set)) else [value] + addresses: list[dict[str, str]] = [] + for candidate in candidates: + address = _normalize_address(candidate) + if address: + addresses.append(address) + return addresses + + +def _normalize_tags(*groups: Any) -> list[str]: + merged: list[str] = [] + for group in groups: + if group is None: + continue + candidates = group if isinstance(group, (list, tuple, set)) else [group] + for candidate in candidates: + tag = _clean_text(candidate) + if tag and tag not in merged: + merged.append(tag) + return merged + + +def _normalize_headers(value: Any) -> dict[str, str]: + if not isinstance(value, Mapping): + return {} + + headers: dict[str, str] = {} + for key, header_value in value.items(): + normalized_key = _clean_text(key) + normalized_value = _clean_text(header_value) + if normalized_key and normalized_value: + headers[normalized_key] = normalized_value + return headers + + class BrevoEmailProvider: provider_name = "brevo_email" @@ -17,8 +81,30 @@ class BrevoEmailProvider: self.sender_name = str(settings.brevo_sender_name or "Orquestrador").strip() or "Orquestrador" self.timeout_seconds = max(1, int(settings.brevo_request_timeout_seconds or 10)) - def is_configured(self) -> bool: - return bool(self.api_key and self.sender_email) + def _normalize_provider_config(self, provider_config: Mapping[str, Any] | None) -> dict[str, Any]: + if not isinstance(provider_config, Mapping): + return {} + return dict(provider_config) + + def _resolve_sender(self, provider_config: Mapping[str, Any]) -> tuple[str | None, str]: + sender = provider_config.get("sender") + sender_payload = _normalize_address(sender) if sender is not None else None + sender_email = ( + (sender_payload or {}).get("email") + or _clean_text(provider_config.get("sender_email")) + or self.sender_email + ) + sender_name = ( + (sender_payload or {}).get("name") + or _clean_text(provider_config.get("sender_name")) + or self.sender_name + ) + return sender_email, sender_name or "Orquestrador" + + def is_configured(self, provider_config: Mapping[str, Any] | None = None) -> bool: + normalized_provider_config = self._normalize_provider_config(provider_config) + sender_email, _sender_name = self._resolve_sender(normalized_provider_config) + return bool(self.api_key and sender_email) async def send_email( self, @@ -28,16 +114,19 @@ class BrevoEmailProvider: subject: str, body: str, tags: list[str] | None = None, + provider_config: Mapping[str, Any] | None = None, ) -> dict: - if not self.is_configured(): + normalized_provider_config = self._normalize_provider_config(provider_config) + sender_email, sender_name = self._resolve_sender(normalized_provider_config) + if not self.is_configured(normalized_provider_config): raise IntegrationProviderError( "Brevo nao configurado. Defina BREVO_API_KEY e BREVO_SENDER_EMAIL para enviar emails." ) payload = { "sender": { - "email": self.sender_email, - "name": self.sender_name, + "email": sender_email, + "name": sender_name, }, "to": [ { @@ -48,8 +137,33 @@ class BrevoEmailProvider: "subject": str(subject or "").strip(), "textContent": str(body or "").strip(), } - if tags: - payload["tags"] = [str(tag).strip() for tag in tags if str(tag).strip()] + + reply_to = _normalize_address( + normalized_provider_config.get("reply_to") or normalized_provider_config.get("reply_to_email"), + fallback_name=normalized_provider_config.get("reply_to_name"), + ) + if reply_to: + payload["replyTo"] = reply_to + + cc = _normalize_address_list(normalized_provider_config.get("cc")) + if cc: + payload["cc"] = cc + + bcc = _normalize_address_list(normalized_provider_config.get("bcc")) + if bcc: + payload["bcc"] = bcc + + merged_tags = _normalize_tags(tags, normalized_provider_config.get("tags")) + if merged_tags: + payload["tags"] = merged_tags + + headers = _normalize_headers(normalized_provider_config.get("headers")) + if headers: + payload["headers"] = headers + + html_content = _clean_text(normalized_provider_config.get("html_content")) + if html_content: + payload["htmlContent"] = html_content headers = { "accept": "application/json", diff --git a/app/services/integrations/service.py b/app/services/integrations/service.py index d10112a..52090c3 100644 --- a/app/services/integrations/service.py +++ b/app/services/integrations/service.py @@ -3,13 +3,20 @@ import json import logging from typing import Any -from sqlalchemy import or_ - from app.core.settings import settings from app.core.time_utils import utc_now from app.db.mock_database import SessionMockLocal from app.db.mock_models import IntegrationDelivery, IntegrationRoute -from app.services.integrations.events import SUPPORTED_EVENT_TYPES +from app.repositories.user_repository import UserRepository +from app.services.integrations.events import ( + ORDER_CANCELLED_EVENT, + ORDER_CREATED_EVENT, + RENTAL_OPENED_EVENT, + RENTAL_PAYMENT_REGISTERED_EVENT, + RENTAL_RETURN_REGISTERED_EVENT, + REVIEW_SCHEDULED_EVENT, + SUPPORTED_EVENT_TYPES, +) from app.services.integrations.providers import BrevoEmailProvider, IntegrationProviderError from app.services.integrations.templates import render_email_content @@ -19,6 +26,23 @@ logger = logging.getLogger(__name__) _PROVIDER_FACTORIES = { "brevo_email": BrevoEmailProvider, } +SUPPORTED_DELIVERY_STATUSES = ( + "pending", + "failed", + "sent", + "skipped", +) +USER_PROFILE_ROUTE_SCOPE = "user_profile" +USER_PROFILE_EVENT_TYPES = ( + ORDER_CREATED_EVENT, + ORDER_CANCELLED_EVENT, + REVIEW_SCHEDULED_EVENT, + RENTAL_OPENED_EVENT, + RENTAL_PAYMENT_REGISTERED_EVENT, + RENTAL_RETURN_REGISTERED_EVENT, +) +DYNAMIC_USER_ROUTE_EMAIL = "dynamic:user_profile" +DYNAMIC_USER_ROUTE_NAME = "Usuario do fluxo" def _clean_text(value: str | None) -> str | None: @@ -48,12 +72,52 @@ def _validate_event_type(event_type: str) -> str: return normalized +def _normalize_int(value: Any) -> int | None: + if value is None: + return None + text = str(value).strip() + if not text: + return None + try: + return int(text) + except (TypeError, ValueError): + return None + + +def _normalize_delivery_statuses(statuses: str | list[str] | tuple[str, ...] | None) -> list[str] | None: + if statuses is None: + return None + + if isinstance(statuses, str): + candidates = [statuses] + else: + candidates = list(statuses) + + normalized_statuses: list[str] = [] + for status in candidates: + normalized = _clean_text(status) + if not normalized: + continue + if normalized not in SUPPORTED_DELIVERY_STATUSES: + raise ValueError(f"unsupported integration delivery status: {status}") + if normalized not in normalized_statuses: + normalized_statuses.append(normalized) + + return normalized_statuses or None + + def _build_idempotency_key(*, route_id: int, event_type: str, payload: dict[str, Any]) -> str: raw = f"{route_id}:{event_type}:{_serialize_json(payload)}" return hashlib.sha256(raw.encode("utf-8")).hexdigest() +def _recipient_scope(provider_config: dict[str, Any] | None) -> str: + scope = str((provider_config or {}).get("recipient_scope") or "").strip().lower() + return scope or "fixed" + + def _serialize_route(route: IntegrationRoute) -> dict[str, Any]: + provider_config = _deserialize_json(route.provider_config_json) return { "id": route.id, "event_type": route.event_type, @@ -61,12 +125,74 @@ def _serialize_route(route: IntegrationRoute) -> dict[str, Any]: "enabled": bool(route.enabled), "recipient_email": route.recipient_email, "recipient_name": route.recipient_name, + "recipient_scope": _recipient_scope(provider_config), "subject_template": route.subject_template, "body_template": route.body_template, - "provider_config": _deserialize_json(route.provider_config_json), + "provider_config": provider_config, + "created_at": route.created_at.isoformat() if route.created_at else None, + "updated_at": route.updated_at.isoformat() if route.updated_at else None, } +def _is_user_profile_route_config(provider_config: dict[str, Any] | None) -> bool: + return _recipient_scope(provider_config) == USER_PROFILE_ROUTE_SCOPE + + +def _is_legacy_user_profile_route_config(provider_config: dict[str, Any] | None) -> bool: + return _is_user_profile_route_config(provider_config) and _normalize_int((provider_config or {}).get("user_id")) is not None + + +def _resolve_user_profile_recipient( + db, + *, + payload: dict[str, Any], + route: IntegrationRoute, +) -> dict[str, str] | None: + user_id = _normalize_int(payload.get("user_id")) + if user_id is None: + return None + + user = UserRepository(db).get_by_id(user_id=user_id) + if user is None: + return None + + email = _clean_text(getattr(user, "email", None)) + if not email: + return None + + recipient = {"email": email} + recipient_name = _clean_text(getattr(user, "name", None)) or _clean_text(route.recipient_name) + if recipient_name: + recipient["name"] = recipient_name + return recipient + + +def _resolve_route_recipient( + db, + *, + route: IntegrationRoute, + payload: dict[str, Any], + provider_config: dict[str, Any] | None = None, +) -> dict[str, str] | None: + normalized_provider_config = provider_config if isinstance(provider_config, dict) else _deserialize_json(route.provider_config_json) + if _is_user_profile_route_config(normalized_provider_config): + return _resolve_user_profile_recipient( + db, + payload=payload, + route=route, + ) + + email = _clean_text(route.recipient_email) + if not email: + return None + + recipient = {"email": email} + recipient_name = _clean_text(route.recipient_name) + if recipient_name: + recipient["name"] = recipient_name + return recipient + + def _serialize_delivery(delivery: IntegrationDelivery) -> dict[str, Any]: return { "id": delivery.id, @@ -76,6 +202,8 @@ def _serialize_delivery(delivery: IntegrationDelivery) -> dict[str, Any]: "status": delivery.status, "attempts": int(delivery.attempts or 0), "payload": _deserialize_json(delivery.payload_json), + "recipient_email": delivery.recipient_email, + "recipient_name": delivery.recipient_name, "rendered_subject": delivery.rendered_subject, "rendered_body": delivery.rendered_body, "provider_message_id": delivery.provider_message_id, @@ -83,6 +211,7 @@ def _serialize_delivery(delivery: IntegrationDelivery) -> dict[str, Any]: "idempotency_key": delivery.idempotency_key, "dispatched_at": delivery.dispatched_at.isoformat() if delivery.dispatched_at else None, "created_at": delivery.created_at.isoformat() if delivery.created_at else None, + "updated_at": delivery.updated_at.isoformat() if delivery.updated_at else None, } @@ -93,22 +222,64 @@ def _get_provider(provider_name: str): return factory() -def list_integration_routes(*, event_type: str | None = None, provider: str | None = None) -> list[dict[str, Any]]: +def list_integration_routes( + *, + event_type: str | None = None, + provider: str | None = None, + enabled: bool | None = None, +) -> list[dict[str, Any]]: db = SessionMockLocal() try: query = db.query(IntegrationRoute) - normalized_event_type = _clean_text(event_type) + normalized_event_type = _validate_event_type(event_type) if _clean_text(event_type) else None if normalized_event_type: query = query.filter(IntegrationRoute.event_type == normalized_event_type) normalized_provider = _clean_text(provider) if normalized_provider: query = query.filter(IntegrationRoute.provider == normalized_provider) + if enabled is not None: + query = query.filter(IntegrationRoute.enabled.is_(bool(enabled))) routes = query.order_by(IntegrationRoute.event_type.asc(), IntegrationRoute.id.asc()).all() return [_serialize_route(route) for route in routes] finally: db.close() +def list_integration_deliveries( + *, + statuses: str | list[str] | tuple[str, ...] | None = None, + event_type: str | None = None, + provider: str | None = None, + route_id: int | None = None, + limit: int = 50, +) -> list[dict[str, Any]]: + normalized_statuses = _normalize_delivery_statuses(statuses) + normalized_event_type = _validate_event_type(event_type) if _clean_text(event_type) else None + normalized_provider = _clean_text(provider) + normalized_route_id = int(route_id) if route_id is not None else None + normalized_limit = max(1, int(limit)) if limit and int(limit) > 0 else None + + db = SessionMockLocal() + try: + query = db.query(IntegrationDelivery) + if normalized_statuses: + query = query.filter(IntegrationDelivery.status.in_(normalized_statuses)) + if normalized_event_type: + query = query.filter(IntegrationDelivery.event_type == normalized_event_type) + if normalized_provider: + query = query.filter(IntegrationDelivery.provider == normalized_provider) + if normalized_route_id is not None: + query = query.filter(IntegrationDelivery.route_id == normalized_route_id) + + query = query.order_by(IntegrationDelivery.id.desc()) + if normalized_limit is not None: + query = query.limit(normalized_limit) + rows = query.all() + return [_serialize_delivery(row) for row in rows] + finally: + db.close() + + def upsert_email_integration_route( *, event_type: str, @@ -122,7 +293,10 @@ def upsert_email_integration_route( ) -> dict[str, Any]: normalized_event_type = _validate_event_type(event_type) normalized_provider = _clean_text(provider) or "brevo_email" + normalized_provider_config = dict(provider_config or {}) normalized_email = _clean_text(recipient_email) + if not normalized_email and _is_user_profile_route_config(normalized_provider_config): + normalized_email = DYNAMIC_USER_ROUTE_EMAIL if not normalized_email: raise ValueError("recipient_email is required") @@ -147,7 +321,7 @@ def upsert_email_integration_route( route.recipient_name = _clean_text(recipient_name) route.subject_template = _clean_text(subject_template) route.body_template = _clean_text(body_template) - route.provider_config_json = _serialize_json(provider_config) + route.provider_config_json = _serialize_json(normalized_provider_config) db.commit() db.refresh(route) return _serialize_route(route) @@ -155,6 +329,56 @@ def upsert_email_integration_route( db.close() +def sync_user_email_integration_routes( + *, + user_id: int, + recipient_email: str, + recipient_name: str | None = None, +) -> list[dict[str, Any]]: + normalized_user_id = _normalize_int(user_id) + normalized_email = _clean_text(recipient_email) + if normalized_user_id is None: + raise ValueError("user_id is required") + if not normalized_email: + raise ValueError("recipient_email is required") + + dynamic_provider_config = { + "recipient_scope": USER_PROFILE_ROUTE_SCOPE, + } + + db = SessionMockLocal() + try: + routes = ( + db.query(IntegrationRoute) + .filter(IntegrationRoute.provider == "brevo_email") + .filter(IntegrationRoute.event_type.in_(USER_PROFILE_EVENT_TYPES)) + .all() + ) + changed = False + for route in routes: + provider_config = _deserialize_json(route.provider_config_json) + if _is_legacy_user_profile_route_config(provider_config) and route.enabled: + route.enabled = False + changed = True + if changed: + db.commit() + finally: + db.close() + + synced_routes: list[dict[str, Any]] = [] + for event_type in USER_PROFILE_EVENT_TYPES: + synced_routes.append( + upsert_email_integration_route( + event_type=event_type, + recipient_email=DYNAMIC_USER_ROUTE_EMAIL, + recipient_name=DYNAMIC_USER_ROUTE_NAME, + enabled=True, + provider_config=dynamic_provider_config, + ) + ) + return synced_routes + + async def emit_business_event(event_type: str, payload: dict[str, Any] | None) -> list[dict[str, Any]]: if not settings.integrations_enabled: return [] @@ -172,6 +396,16 @@ async def emit_business_event(event_type: str, payload: dict[str, Any] | None) - .all() ) for route in routes: + route_provider_config = _deserialize_json(route.provider_config_json) + recipient = _resolve_route_recipient( + db, + route=route, + payload=normalized_payload, + provider_config=route_provider_config, + ) + if recipient is None: + continue + idempotency_key = _build_idempotency_key( route_id=route.id, event_type=normalized_event_type, @@ -192,6 +426,8 @@ async def emit_business_event(event_type: str, payload: dict[str, Any] | None) - provider=route.provider, status="pending", payload_json=_serialize_json(normalized_payload), + recipient_email=recipient.get("email"), + recipient_name=recipient.get("name"), idempotency_key=idempotency_key, ) db.add(delivery) @@ -245,6 +481,7 @@ async def dispatch_delivery(delivery_id: int) -> dict[str, Any] | None: return _serialize_delivery(delivery) payload = _deserialize_json(delivery.payload_json) + route_provider_config = _deserialize_json(route.provider_config_json) subject, body = render_email_content( event_type=delivery.event_type, payload=payload, @@ -255,14 +492,35 @@ async def dispatch_delivery(delivery_id: int) -> dict[str, Any] | None: delivery.rendered_body = body delivery.attempts = int(delivery.attempts or 0) + 1 + recipient_email = _clean_text(delivery.recipient_email) + recipient_name = _clean_text(delivery.recipient_name) + if not recipient_email: + recipient = _resolve_route_recipient( + db, + route=route, + payload=payload, + provider_config=route_provider_config, + ) + if recipient is None: + delivery.status = "skipped" + delivery.last_error = "Recipient email unavailable for delivery." + db.commit() + db.refresh(delivery) + return _serialize_delivery(delivery) + recipient_email = recipient.get("email") + recipient_name = recipient.get("name") + delivery.recipient_email = recipient_email + delivery.recipient_name = recipient_name + try: provider = _get_provider(route.provider) result = await provider.send_email( - to_email=route.recipient_email, - to_name=route.recipient_name, + to_email=recipient_email, + to_name=recipient_name, subject=subject, body=body, tags=[delivery.event_type], + provider_config=route_provider_config, ) except IntegrationProviderError as exc: delivery.status = "failed" @@ -286,23 +544,38 @@ async def process_pending_deliveries( *, limit: int = 20, delivery_ids: list[int] | None = None, + statuses: str | list[str] | tuple[str, ...] | None = None, + event_type: str | None = None, + provider: str | None = None, + route_id: int | None = None, ) -> list[dict[str, Any]]: + normalized_statuses = _normalize_delivery_statuses(statuses) + normalized_event_type = _validate_event_type(event_type) if _clean_text(event_type) else None + normalized_provider = _clean_text(provider) + normalized_route_id = int(route_id) if route_id is not None else None + normalized_delivery_ids = sorted({int(delivery_id) for delivery_id in delivery_ids or []}) + + if not normalized_delivery_ids and normalized_statuses is None: + normalized_statuses = ["pending", "failed"] + db = SessionMockLocal() try: query = db.query(IntegrationDelivery) - if delivery_ids: - query = query.filter(IntegrationDelivery.id.in_(delivery_ids)) + if normalized_delivery_ids: + query = query.filter(IntegrationDelivery.id.in_(normalized_delivery_ids)) else: - query = query.filter( - or_( - IntegrationDelivery.status == "pending", - IntegrationDelivery.status == "failed", - ) - ) + if normalized_statuses: + query = query.filter(IntegrationDelivery.status.in_(normalized_statuses)) + if normalized_event_type: + query = query.filter(IntegrationDelivery.event_type == normalized_event_type) + if normalized_provider: + query = query.filter(IntegrationDelivery.provider == normalized_provider) + if normalized_route_id is not None: + query = query.filter(IntegrationDelivery.route_id == normalized_route_id) query = query.order_by(IntegrationDelivery.id.asc()) if limit and int(limit) > 0: query = query.limit(max(1, int(limit))) - if delivery_ids: + if normalized_delivery_ids: query = query.order_by(IntegrationDelivery.id.asc()) selected_ids = [row.id for row in query.all()] finally: diff --git a/app/services/orchestration/conversation_state_store.py b/app/services/orchestration/conversation_state_store.py index 883df31..11abfc2 100644 --- a/app/services/orchestration/conversation_state_store.py +++ b/app/services/orchestration/conversation_state_store.py @@ -21,6 +21,7 @@ class ConversationStateStore(ConversationStateRepository): self.pending_stock_selections: dict[int, dict] = {} self.pending_rental_drafts: dict[int, dict] = {} self.pending_rental_selections: dict[int, dict] = {} + self.pending_email_capture_requests: dict[int, dict] = {} self.telegram_processed_messages: dict[int, dict] = {} self.telegram_runtime_state: dict[int, dict] = {} diff --git a/app/services/orchestration/orquestrador_service.py b/app/services/orchestration/orquestrador_service.py index b983f32..9eeb61f 100644 --- a/app/services/orchestration/orquestrador_service.py +++ b/app/services/orchestration/orquestrador_service.py @@ -9,6 +9,17 @@ from uuid import uuid4 from fastapi import HTTPException from sqlalchemy.orm import Session +from app.db.mock_database import SessionMockLocal +from app.repositories.user_repository import UserRepository +from app.services.integrations.events import ( + ORDER_CANCELLED_EVENT, + ORDER_CREATED_EVENT, + RENTAL_OPENED_EVENT, + RENTAL_PAYMENT_REGISTERED_EVENT, + RENTAL_RETURN_REGISTERED_EVENT, + REVIEW_SCHEDULED_EVENT, +) +from app.services.integrations.service import emit_business_event, sync_user_email_integration_routes from app.services.orchestration.orchestrator_config import ( LOW_VALUE_RESPONSES, ORCHESTRATION_CONTROL_TOOLS, @@ -39,6 +50,18 @@ from app.services.orchestration.response_formatter import format_currency_br, fo logger = logging.getLogger(__name__) +EMAIL_CAPTURE_BUCKET = "pending_email_capture_requests" +EMAIL_CAPTURE_TTL_MINUTES = 30 +EMAIL_CAPTURE_PATTERN = re.compile(r"^[A-Z0-9._%+-]+@[A-Z0-9.-]+\.[A-Z]{2,}$", re.IGNORECASE) +EMAIL_CAPTURE_EVENT_BY_TOOL = { + "realizar_pedido": ORDER_CREATED_EVENT, + "cancelar_pedido": ORDER_CANCELLED_EVENT, + "agendar_revisao": REVIEW_SCHEDULED_EVENT, + "abrir_locacao_aluguel": RENTAL_OPENED_EVENT, + "registrar_pagamento_aluguel": RENTAL_PAYMENT_REGISTERED_EVENT, + "registrar_devolucao_aluguel": RENTAL_RETURN_REGISTERED_EVENT, +} + # Coordenador principal do turno conversacional: # atualiza estado, pede decisoes ao modelo, continua fluxos e executa tools. @@ -57,6 +80,7 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin, RentalFlowMixin): self.tool_executor = ToolExecutor(registry=self.registry) self.policy = ConversationPolicy(service=self) self.history_service = ConversationHistoryService() + self._user_profile_routes_ready = False @property def _context_manager(self) -> OrchestratorContextManager: @@ -112,6 +136,10 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin, RentalFlowMixin): base_response=composed, user_id=user_id, ) + final_response = self._append_email_capture_prompt_if_needed( + response=final_response, + user_id=user_id, + ) turn_trace["elapsed_ms"] = round((perf_counter() - turn_started_perf) * 1000, 2) self._log_turn_event("turn_completed", response=final_response) if not turn_history_persisted: @@ -125,6 +153,13 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin, RentalFlowMixin): try: self._upsert_user_context(user_id=user_id) + self._ensure_user_email_routes(user_id=user_id) + pending_email_capture_response = await self._try_handle_pending_email_capture_message( + message=message, + user_id=user_id, + ) + if pending_email_capture_response: + return await finish(pending_email_capture_response) if hasattr(self, "policy") and self._is_order_selection_reset_message(message): reset_override = await self._try_handle_immediate_context_reset( message=message, @@ -1659,6 +1694,189 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin, RentalFlowMixin): def _save_user_context(self, user_id: int | None, context: dict | None) -> None: self._context_manager.save_user_context(user_id=user_id, context=context) + def _get_user_record(self, user_id: int | None): + if user_id is None: + return None + db = SessionMockLocal() + try: + return UserRepository(db).get_by_id(user_id=user_id) + except Exception: + logger.debug( + "Falha ao carregar cadastro do usuario para email.", + extra={"user_id": user_id}, + ) + return None + finally: + db.close() + + def _get_saved_user_email(self, user_id: int | None) -> str | None: + user = self._get_user_record(user_id=user_id) + return str(getattr(user, "email", "") or "").strip().lower() or None + + def _save_user_email(self, user_id: int | None, email: str | None): + if user_id is None: + return None + db = SessionMockLocal() + try: + repo = UserRepository(db) + user = repo.get_by_id(user_id=user_id) + if not user: + return None + return repo.update_email(user=user, email=email) + except Exception: + logger.debug( + "Falha ao salvar email do usuario.", + extra={"user_id": user_id}, + ) + return None + finally: + db.close() + + def _ensure_user_email_routes(self, user_id: int | None) -> None: + if getattr(self, "_user_profile_routes_ready", False): + return + user = self._get_user_record(user_id=user_id) + if not user or not getattr(user, "email", None): + return + try: + sync_user_email_integration_routes( + user_id=user.id, + recipient_email=user.email, + recipient_name=user.name, + ) + self._user_profile_routes_ready = True + except Exception: + logger.exception( + "Falha ao sincronizar rotas de email do usuario.", + extra={"user_id": user_id}, + ) + + def _normalize_email_address(self, value: str | None) -> str | None: + normalized = str(value or "").strip().lower() + if not normalized: + return None + if not EMAIL_CAPTURE_PATTERN.fullmatch(normalized): + return None + return normalized + + def _is_email_capture_decline_message(self, text: str) -> bool: + normalized = self._normalize_text(text).strip().rstrip(".!?,;:") + return normalized in { + "nao", + "nao quero", + "nao quero informar", + "prefiro nao informar", + "agora nao", + "sem email", + } + + def _get_pending_email_capture_request(self, user_id: int | None) -> dict | None: + state = getattr(self, "state", None) + if state is None or not hasattr(state, "get_entry"): + return None + return state.get_entry(EMAIL_CAPTURE_BUCKET, user_id, expire=True) + + def _clear_pending_email_capture_request(self, user_id: int | None) -> None: + state = getattr(self, "state", None) + if state is None or not hasattr(state, "pop_entry"): + return + state.pop_entry(EMAIL_CAPTURE_BUCKET, user_id) + + def _stage_email_capture_request( + self, + tool_name: str, + tool_result, + user_id: int | None, + ) -> None: + state = getattr(self, "state", None) + if ( + user_id is None + or tool_name not in EMAIL_CAPTURE_EVENT_BY_TOOL + or not isinstance(tool_result, dict) + or state is None + or not hasattr(state, "set_entry") + ): + return + if self._get_saved_user_email(user_id=user_id): + return + + payload = dict(tool_result) + payload.setdefault("user_id", user_id) + state.set_entry( + EMAIL_CAPTURE_BUCKET, + user_id, + { + "request_id": str((getattr(self, "_turn_trace", {}) or {}).get("request_id") or ""), + "event_type": EMAIL_CAPTURE_EVENT_BY_TOOL[tool_name], + "payload": payload, + "expires_at": utc_now() + timedelta(minutes=EMAIL_CAPTURE_TTL_MINUTES), + }, + ) + + def _append_email_capture_prompt_if_needed(self, response: str, user_id: int | None) -> str: + if user_id is None or self._get_saved_user_email(user_id=user_id): + return response + pending = self._get_pending_email_capture_request(user_id=user_id) + current_request_id = str((getattr(self, "_turn_trace", {}) or {}).get("request_id") or "") + if not pending or pending.get("request_id") != current_request_id: + return response + prompt = ( + "Se quiser, posso te enviar esse resumo por e-mail. " + "Responda com um e-mail valido ou diga 'prefiro nao informar'." + ) + base = str(response or "").rstrip() + return f"{base}\n\n{prompt}" if base else prompt + + async def _try_handle_pending_email_capture_message( + self, + message: str, + user_id: int | None, + ) -> str | None: + if user_id is None: + return None + pending = self._get_pending_email_capture_request(user_id=user_id) + if not pending: + return None + + if self._is_email_capture_decline_message(message): + self._clear_pending_email_capture_request(user_id=user_id) + return "Tudo bem. Nao vou enviar este resumo por e-mail." + + normalized_email = self._normalize_email_address(message) + if not normalized_email: + return None + + user = self._save_user_email(user_id=user_id, email=normalized_email) + if not user: + self._clear_pending_email_capture_request(user_id=user_id) + return "Nao consegui localizar seu cadastro para salvar o e-mail." + + self._ensure_user_email_routes(user_id=user_id) + event_type = str(pending.get("event_type") or "").strip() + payload = dict(pending.get("payload") or {}) + payload.setdefault("user_id", user_id) + + deliveries = [] + if event_type and payload: + try: + deliveries = await emit_business_event(event_type=event_type, payload=payload) + except Exception: + logger.exception( + "Falha ao reenviar evento apos captura de email do usuario.", + extra={"user_id": user_id, "event_type": event_type}, + ) + + self._clear_pending_email_capture_request(user_id=user_id) + delivered = any( + isinstance(item, dict) + and item.get("status") == "sent" + and str(item.get("provider_message_id") or "").strip() + for item in deliveries + ) + if delivered: + return f"Perfeito. Salvei seu e-mail {normalized_email} e enviei este resumo por la." + return f"Perfeito. Salvei seu e-mail {normalized_email}. Vou usar esse endereco nos proximos envios." + def _extract_generic_memory_fields(self, llm_generic_fields: dict | None = None) -> dict: return self._context_manager.extract_generic_memory_fields( llm_generic_fields=llm_generic_fields, @@ -1699,6 +1917,11 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin, RentalFlowMixin): tool_result=tool_result, user_id=user_id, ) + self._stage_email_capture_request( + tool_name=tool_name, + tool_result=tool_result, + user_id=user_id, + ) async def _maybe_build_stock_suggestion_response( self, @@ -2061,25 +2284,7 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin, RentalFlowMixin): if decision_intent != "review_schedule": return False - entities = extracted_entities if isinstance(extracted_entities, dict) else {} - review_fields = entities.get("review_fields") - generic_memory = entities.get("generic_memory") - if not isinstance(review_fields, dict): - review_fields = {} - if not isinstance(generic_memory, dict): - generic_memory = {} - - return any( - ( - review_fields.get("placa"), - review_fields.get("data_hora"), - review_fields.get("modelo"), - review_fields.get("ano"), - review_fields.get("km"), - review_fields.get("revisao_previa_concessionaria"), - generic_memory.get("placa"), - ) - ) + return True def _has_trade_in_evaluation_request(self, message: str, turn_decision: dict | None = None) -> bool: normalized_message = self._normalize_text(message or "").strip() @@ -2748,5 +2953,3 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin, RentalFlowMixin): tool_name=tool_name, tool_result=tool_result, ) - - diff --git a/scripts/list_integration_deliveries.py b/scripts/list_integration_deliveries.py new file mode 100644 index 0000000..56dcdd5 --- /dev/null +++ b/scripts/list_integration_deliveries.py @@ -0,0 +1,34 @@ +import argparse +import json +import sys +from pathlib import Path + +PROJECT_ROOT = Path(__file__).resolve().parents[1] +if str(PROJECT_ROOT) not in sys.path: + sys.path.insert(0, str(PROJECT_ROOT)) + +from app.services.integrations.events import SUPPORTED_EVENT_TYPES +from app.services.integrations.service import SUPPORTED_DELIVERY_STATUSES, list_integration_deliveries + + +def main() -> None: + parser = argparse.ArgumentParser(description="Lista entregas do outbox de integracoes com filtros operacionais.") + parser.add_argument("--status", action="append", choices=SUPPORTED_DELIVERY_STATUSES) + parser.add_argument("--event", choices=SUPPORTED_EVENT_TYPES) + parser.add_argument("--provider") + parser.add_argument("--route-id", type=int) + parser.add_argument("--limit", type=int, default=50) + args = parser.parse_args() + + deliveries = list_integration_deliveries( + statuses=args.status, + event_type=args.event, + provider=args.provider, + route_id=args.route_id, + limit=args.limit, + ) + print(json.dumps(deliveries, ensure_ascii=True, indent=2, sort_keys=True)) + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/scripts/list_integration_routes.py b/scripts/list_integration_routes.py new file mode 100644 index 0000000..153219e --- /dev/null +++ b/scripts/list_integration_routes.py @@ -0,0 +1,38 @@ +import argparse +import json +import sys +from pathlib import Path + +PROJECT_ROOT = Path(__file__).resolve().parents[1] +if str(PROJECT_ROOT) not in sys.path: + sys.path.insert(0, str(PROJECT_ROOT)) + +from app.services.integrations.events import SUPPORTED_EVENT_TYPES +from app.services.integrations.service import list_integration_routes + + +def main() -> None: + parser = argparse.ArgumentParser(description="Lista rotas de integracao configuradas.") + parser.add_argument("--event", choices=SUPPORTED_EVENT_TYPES) + parser.add_argument("--provider") + enabled_group = parser.add_mutually_exclusive_group() + enabled_group.add_argument("--enabled", action="store_true") + enabled_group.add_argument("--disabled", action="store_true") + args = parser.parse_args() + + enabled = None + if args.enabled: + enabled = True + if args.disabled: + enabled = False + + routes = list_integration_routes( + event_type=args.event, + provider=args.provider, + enabled=enabled, + ) + print(json.dumps(routes, ensure_ascii=True, indent=2, sort_keys=True)) + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/scripts/process_integration_deliveries.py b/scripts/process_integration_deliveries.py index d349625..783d239 100644 --- a/scripts/process_integration_deliveries.py +++ b/scripts/process_integration_deliveries.py @@ -1,21 +1,57 @@ import argparse import asyncio import json +import sys +from pathlib import Path -from app.services.integrations.service import process_pending_deliveries +PROJECT_ROOT = Path(__file__).resolve().parents[1] +if str(PROJECT_ROOT) not in sys.path: + sys.path.insert(0, str(PROJECT_ROOT)) +from app.services.integrations.events import SUPPORTED_EVENT_TYPES +from app.services.integrations.service import SUPPORTED_DELIVERY_STATUSES, process_pending_deliveries -async def _main_async(limit: int) -> None: - deliveries = await process_pending_deliveries(limit=limit) + +async def _main_async( + *, + limit: int, + delivery_ids: list[int] | None, + statuses: list[str] | None, + event_type: str | None, + provider: str | None, + route_id: int | None, +) -> None: + deliveries = await process_pending_deliveries( + limit=limit, + delivery_ids=delivery_ids, + statuses=statuses, + event_type=event_type, + provider=provider, + route_id=route_id, + ) print(json.dumps(deliveries, ensure_ascii=True, indent=2, sort_keys=True)) def main() -> None: parser = argparse.ArgumentParser(description="Processa entregas pendentes do outbox de integracoes.") parser.add_argument("--limit", type=int, default=20) + parser.add_argument("--delivery-id", dest="delivery_ids", type=int, action="append") + parser.add_argument("--status", action="append", choices=SUPPORTED_DELIVERY_STATUSES) + parser.add_argument("--event", choices=SUPPORTED_EVENT_TYPES) + parser.add_argument("--provider") + parser.add_argument("--route-id", type=int) args = parser.parse_args() - asyncio.run(_main_async(limit=args.limit)) + asyncio.run( + _main_async( + limit=args.limit, + delivery_ids=args.delivery_ids, + statuses=args.status, + event_type=args.event, + provider=args.provider, + route_id=args.route_id, + ) + ) if __name__ == "__main__": - main() + main() \ No newline at end of file diff --git a/scripts/upsert_integration_route.py b/scripts/upsert_integration_route.py index d0b5626..f4f4ffd 100644 --- a/scripts/upsert_integration_route.py +++ b/scripts/upsert_integration_route.py @@ -1,10 +1,93 @@ import argparse import json +import sys +from pathlib import Path +from typing import Any + +PROJECT_ROOT = Path(__file__).resolve().parents[1] +if str(PROJECT_ROOT) not in sys.path: + sys.path.insert(0, str(PROJECT_ROOT)) from app.services.integrations.events import SUPPORTED_EVENT_TYPES from app.services.integrations.service import upsert_email_integration_route +def _deep_merge_dicts(base: dict[str, Any], extra: dict[str, Any]) -> dict[str, Any]: + merged = dict(base) + for key, value in extra.items(): + current = merged.get(key) + if isinstance(current, dict) and isinstance(value, dict): + merged[key] = _deep_merge_dicts(current, value) + else: + merged[key] = value + return merged + + +def _parse_provider_config_json(raw_value: str | None) -> dict[str, Any]: + if not raw_value: + return {} + try: + parsed = json.loads(raw_value) + except ValueError as exc: + raise SystemExit(f"provider_config_json invalido: {exc}") from exc + if not isinstance(parsed, dict): + raise SystemExit("provider_config_json deve ser um objeto JSON.") + return parsed + + +def _parse_headers(values: list[str] | None) -> dict[str, str]: + headers: dict[str, str] = {} + for value in values or []: + key, separator, header_value = str(value or "").partition("=") + key = key.strip() + header_value = header_value.strip() + if not separator or not key or not header_value: + raise SystemExit(f"header invalido: {value}. Use o formato Chave=Valor.") + headers[key] = header_value + return headers + + +def _build_provider_config(args) -> dict[str, Any] | None: + provider_config = _parse_provider_config_json(args.provider_config_json) + convenience_config: dict[str, Any] = {} + + if args.sender_email or args.sender_name: + convenience_config["sender"] = { + key: value + for key, value in { + "email": args.sender_email, + "name": args.sender_name, + }.items() + if value + } + + if args.reply_to_email or args.reply_to_name: + convenience_config["reply_to"] = { + key: value + for key, value in { + "email": args.reply_to_email, + "name": args.reply_to_name, + }.items() + if value + } + + if args.cc: + convenience_config["cc"] = args.cc + if args.bcc: + convenience_config["bcc"] = args.bcc + if args.tag: + convenience_config["tags"] = args.tag + if args.html_content: + convenience_config["html_content"] = args.html_content + + headers = _parse_headers(args.header) + if headers: + convenience_config["headers"] = headers + + merged = _deep_merge_dicts(provider_config, convenience_config) + return merged or None + + def main() -> None: parser = argparse.ArgumentParser(description="Cria ou atualiza uma rota de integracao por email.") parser.add_argument("--event", required=True, choices=SUPPORTED_EVENT_TYPES) @@ -12,6 +95,16 @@ def main() -> None: parser.add_argument("--name") parser.add_argument("--subject-template") parser.add_argument("--body-template") + parser.add_argument("--provider-config-json") + parser.add_argument("--sender-email") + parser.add_argument("--sender-name") + parser.add_argument("--reply-to-email") + parser.add_argument("--reply-to-name") + parser.add_argument("--cc", action="append") + parser.add_argument("--bcc", action="append") + parser.add_argument("--tag", action="append") + parser.add_argument("--header", action="append") + parser.add_argument("--html-content") parser.add_argument("--disabled", action="store_true") args = parser.parse_args() @@ -22,6 +115,7 @@ def main() -> None: subject_template=args.subject_template, body_template=args.body_template, enabled=not args.disabled, + provider_config=_build_provider_config(args), ) print(json.dumps(route, ensure_ascii=True, indent=2, sort_keys=True)) diff --git a/tests/test_brevo_provider.py b/tests/test_brevo_provider.py new file mode 100644 index 0000000..9f6cefb --- /dev/null +++ b/tests/test_brevo_provider.py @@ -0,0 +1,100 @@ +import os +import unittest +from unittest.mock import AsyncMock, patch + +os.environ.setdefault("DEBUG", "false") + +from app.core.settings import settings +from app.services.integrations.providers import BrevoEmailProvider + + +class _FakeResponse: + def __init__(self, *, status_code: int = 201, payload: dict | None = None, text: str = "") -> None: + self.status_code = status_code + self._payload = payload or {} + self.text = text + + def json(self) -> dict: + return self._payload + + +class BrevoEmailProviderTests(unittest.IsolatedAsyncioTestCase): + async def test_send_email_applies_provider_config_to_payload(self): + response = _FakeResponse(payload={"messageId": "brevo-123"}, text='{"messageId":"brevo-123"}') + post_mock = AsyncMock(return_value=response) + fake_client = type("FakeClient", (), {"post": post_mock})() + + with patch.object(settings, "brevo_api_key", "brevo-key"), patch.object( + settings, + "brevo_sender_email", + "sender@empresa.com", + ), patch.object(settings, "brevo_sender_name", "Orquestrador"), patch( + "app.services.integrations.providers.httpx.AsyncClient" + ) as client_cls: + client_cls.return_value.__aenter__ = AsyncMock(return_value=fake_client) + client_cls.return_value.__aexit__ = AsyncMock(return_value=None) + provider = BrevoEmailProvider() + result = await provider.send_email( + to_email="destinatario@empresa.com", + to_name="Operacoes", + subject="Pedido criado", + body="Pedido PED-1 criado com sucesso.", + tags=["order.created"], + provider_config={ + "sender": {"email": "noreply@empresa.com", "name": "Operacoes"}, + "reply_to": {"email": "atendimento@empresa.com", "name": "Atendimento"}, + "cc": ["financeiro@empresa.com", {"email": "gestor@empresa.com", "name": "Gestor"}], + "bcc": ["auditoria@empresa.com"], + "tags": ["ops", "order.created"], + "headers": {"X-Canal": "orquestrador"}, + "html_content": "Pedido PED-1 criado com sucesso.", + }, + ) + + payload = post_mock.await_args.kwargs["json"] + self.assertEqual(payload["sender"], {"email": "noreply@empresa.com", "name": "Operacoes"}) + self.assertEqual(payload["replyTo"], {"email": "atendimento@empresa.com", "name": "Atendimento"}) + self.assertEqual( + payload["cc"], + [ + {"email": "financeiro@empresa.com"}, + {"email": "gestor@empresa.com", "name": "Gestor"}, + ], + ) + self.assertEqual(payload["bcc"], [{"email": "auditoria@empresa.com"}]) + self.assertEqual(payload["tags"], ["order.created", "ops"]) + self.assertEqual(payload["headers"], {"X-Canal": "orquestrador"}) + self.assertEqual(payload["htmlContent"], "Pedido PED-1 criado com sucesso.") + self.assertEqual(result["message_id"], "brevo-123") + + async def test_send_email_accepts_route_sender_without_global_sender_email(self): + response = _FakeResponse(payload={"messageId": "brevo-456"}, text='{"messageId":"brevo-456"}') + post_mock = AsyncMock(return_value=response) + fake_client = type("FakeClient", (), {"post": post_mock})() + + with patch.object(settings, "brevo_api_key", "brevo-key"), patch.object( + settings, + "brevo_sender_email", + "", + ), patch.object(settings, "brevo_sender_name", "Orquestrador"), patch( + "app.services.integrations.providers.httpx.AsyncClient" + ) as client_cls: + client_cls.return_value.__aenter__ = AsyncMock(return_value=fake_client) + client_cls.return_value.__aexit__ = AsyncMock(return_value=None) + provider = BrevoEmailProvider() + await provider.send_email( + to_email="destinatario@empresa.com", + to_name=None, + subject="Pedido criado", + body="Pedido PED-1 criado com sucesso.", + provider_config={ + "sender": {"email": "noreply@empresa.com", "name": "Operacoes"}, + }, + ) + + payload = post_mock.await_args.kwargs["json"] + self.assertEqual(payload["sender"], {"email": "noreply@empresa.com", "name": "Operacoes"}) + + +if __name__ == "__main__": + unittest.main() \ No newline at end of file diff --git a/tests/test_conversation_state_store.py b/tests/test_conversation_state_store.py index 39cf44d..8c5002b 100644 --- a/tests/test_conversation_state_store.py +++ b/tests/test_conversation_state_store.py @@ -1,5 +1,6 @@ -import unittest +import unittest +from app.core.time_utils import utc_now from app.services.orchestration.conversation_state_store import ConversationStateStore @@ -34,6 +35,21 @@ class ConversationStateStoreTests(unittest.TestCase): self.assertEqual(stored_context["active_task"], "order_create") self.assertEqual(stored_context["expires_at"], original_expires_at) + def test_pending_email_capture_bucket_supports_set_get_and_pop(self): + store = ConversationStateStore() + payload = { + "request_id": "req-1", + "event_type": "order.created", + "payload": {"numero_pedido": "PED-1", "user_id": 7}, + "expires_at": utc_now(), + } + + store.set_entry("pending_email_capture_requests", 7, payload) + + self.assertEqual(store.get_entry("pending_email_capture_requests", 7), payload) + self.assertEqual(store.pop_entry("pending_email_capture_requests", 7), payload) + self.assertIsNone(store.get_entry("pending_email_capture_requests", 7)) + if __name__ == "__main__": unittest.main() diff --git a/tests/test_integration_service.py b/tests/test_integration_service.py index fe4706a..31f7d34 100644 --- a/tests/test_integration_service.py +++ b/tests/test_integration_service.py @@ -1,4 +1,4 @@ -import os +import os import unittest from unittest.mock import AsyncMock, patch @@ -9,8 +9,8 @@ from sqlalchemy.pool import StaticPool os.environ.setdefault("DEBUG", "false") from app.db.mock_database import MockBase -from app.db.mock_models import IntegrationDelivery -from app.services.integrations.events import ORDER_CREATED_EVENT +from app.db.mock_models import IntegrationDelivery, IntegrationRoute, User +from app.services.integrations.events import ORDER_CREATED_EVENT, REVIEW_SCHEDULED_EVENT from app.services.integrations import service as integration_service @@ -26,6 +26,29 @@ class IntegrationServiceTests(unittest.IsolatedAsyncioTestCase): self.addCleanup(engine.dispose) return SessionLocal + def _create_user( + self, + SessionLocal, + *, + user_id: int, + email: str | None, + name: str = "Cliente Teste", + ) -> None: + db = SessionLocal() + try: + db.add( + User( + id=user_id, + channel="telegram", + external_id=f"tg-{user_id}", + name=name, + email=email, + ) + ) + db.commit() + finally: + db.close() + async def test_emit_business_event_creates_and_dispatches_delivery(self): SessionLocal = self._build_session_local() @@ -66,15 +89,62 @@ class IntegrationServiceTests(unittest.IsolatedAsyncioTestCase): self.assertEqual(len(deliveries), 1) self.assertEqual(deliveries[0]["status"], "sent") self.assertEqual(deliveries[0]["provider_message_id"], "brevo-123") + self.assertEqual(deliveries[0]["recipient_email"], "ops@example.com") db = SessionLocal() try: stored = db.query(IntegrationDelivery).one() self.assertEqual(stored.status, "sent") self.assertEqual(stored.provider_message_id, "brevo-123") + self.assertEqual(stored.recipient_email, "ops@example.com") finally: db.close() + async def test_emit_business_event_passes_route_provider_config_to_provider(self): + SessionLocal = self._build_session_local() + fake_provider = type( + "FakeProvider", + (), + { + "send_email": AsyncMock(return_value={"message_id": "brevo-config"}), + }, + )() + provider_config = { + "sender": {"email": "noreply@example.com", "name": "Operacoes"}, + "reply_to": {"email": "atendimento@example.com", "name": "Atendimento"}, + "tags": ["ops"], + } + + with patch.object(integration_service, "SessionMockLocal", SessionLocal), patch.object( + integration_service.settings, + "integrations_enabled", + True, + ), patch.object( + integration_service.settings, + "integration_sync_delivery_enabled", + True, + ), patch.object( + integration_service, + "_get_provider", + return_value=fake_provider, + ): + integration_service.upsert_email_integration_route( + event_type=ORDER_CREATED_EVENT, + recipient_email="ops@example.com", + provider_config=provider_config, + ) + await integration_service.emit_business_event( + ORDER_CREATED_EVENT, + { + "numero_pedido": "PED-1", + }, + ) + + fake_provider.send_email.assert_awaited_once() + kwargs = fake_provider.send_email.await_args.kwargs + self.assertEqual(kwargs["provider_config"], provider_config) + self.assertEqual(kwargs["tags"], [ORDER_CREATED_EVENT]) + async def test_emit_business_event_deduplicates_by_route_and_payload(self): SessionLocal = self._build_session_local() @@ -102,6 +172,155 @@ class IntegrationServiceTests(unittest.IsolatedAsyncioTestCase): finally: db.close() + async def test_emit_business_event_skips_dynamic_user_route_without_saved_email(self): + SessionLocal = self._build_session_local() + self._create_user(SessionLocal, user_id=7, email=None) + + with patch.object(integration_service, "SessionMockLocal", SessionLocal), patch.object( + integration_service.settings, + "integrations_enabled", + True, + ), patch.object( + integration_service.settings, + "integration_sync_delivery_enabled", + False, + ): + integration_service.sync_user_email_integration_routes( + user_id=7, + recipient_email="cliente@example.com", + recipient_name="Cliente Teste", + ) + deliveries = await integration_service.emit_business_event( + ORDER_CREATED_EVENT, + {"numero_pedido": "PED-SEM-EMAIL", "user_id": 7}, + ) + + self.assertEqual(deliveries, []) + + db = SessionLocal() + try: + self.assertEqual(db.query(IntegrationDelivery).count(), 0) + finally: + db.close() + + async def test_emit_business_event_resolves_dynamic_user_recipient_from_user_profile(self): + SessionLocal = self._build_session_local() + self._create_user(SessionLocal, user_id=7, email="cliente@example.com", name="Cliente Teste") + fake_provider = type( + "FakeProvider", + (), + { + "send_email": AsyncMock(return_value={"message_id": "brevo-dynamic"}), + }, + )() + + with patch.object(integration_service, "SessionMockLocal", SessionLocal), patch.object( + integration_service.settings, + "integrations_enabled", + True, + ), patch.object( + integration_service.settings, + "integration_sync_delivery_enabled", + True, + ), patch.object( + integration_service, + "_get_provider", + return_value=fake_provider, + ): + integration_service.sync_user_email_integration_routes( + user_id=7, + recipient_email="cliente@example.com", + recipient_name="Cliente Teste", + ) + deliveries = await integration_service.emit_business_event( + ORDER_CREATED_EVENT, + {"numero_pedido": "PED-DINAMICO", "user_id": 7}, + ) + + self.assertEqual(len(deliveries), 1) + self.assertEqual(deliveries[0]["status"], "sent") + self.assertEqual(deliveries[0]["recipient_email"], "cliente@example.com") + + fake_provider.send_email.assert_awaited_once() + kwargs = fake_provider.send_email.await_args.kwargs + self.assertEqual(kwargs["to_email"], "cliente@example.com") + self.assertEqual(kwargs["to_name"], "Cliente Teste") + + db = SessionLocal() + try: + stored = db.query(IntegrationDelivery).one() + self.assertEqual(stored.recipient_email, "cliente@example.com") + self.assertEqual(stored.recipient_name, "Cliente Teste") + finally: + db.close() + + async def test_sync_user_email_integration_routes_creates_one_global_dynamic_route_per_event(self): + SessionLocal = self._build_session_local() + + with patch.object(integration_service, "SessionMockLocal", SessionLocal): + integration_service.sync_user_email_integration_routes( + user_id=7, + recipient_email="primeiro@example.com", + recipient_name="Cliente Teste", + ) + integration_service.sync_user_email_integration_routes( + user_id=9, + recipient_email="segundo@example.com", + recipient_name="Outro Cliente", + ) + + db = SessionLocal() + try: + routes = ( + db.query(IntegrationRoute) + .filter(IntegrationRoute.event_type == ORDER_CREATED_EVENT) + .order_by(IntegrationRoute.id.asc()) + .all() + ) + self.assertEqual(len(routes), 1) + self.assertEqual(routes[0].recipient_email, integration_service.DYNAMIC_USER_ROUTE_EMAIL) + self.assertTrue(routes[0].enabled) + self.assertEqual( + routes[0].provider_config_json, + integration_service._serialize_json({"recipient_scope": integration_service.USER_PROFILE_ROUTE_SCOPE}), + ) + finally: + db.close() + + async def test_sync_user_email_integration_routes_disables_legacy_user_specific_routes(self): + SessionLocal = self._build_session_local() + + with patch.object(integration_service, "SessionMockLocal", SessionLocal): + integration_service.upsert_email_integration_route( + event_type=ORDER_CREATED_EVENT, + recipient_email="legacy@example.com", + enabled=True, + provider_config={ + "recipient_scope": integration_service.USER_PROFILE_ROUTE_SCOPE, + "user_id": 7, + }, + ) + integration_service.sync_user_email_integration_routes( + user_id=7, + recipient_email="cliente@example.com", + recipient_name="Cliente Teste", + ) + + db = SessionLocal() + try: + routes = ( + db.query(IntegrationRoute) + .filter(IntegrationRoute.event_type == ORDER_CREATED_EVENT) + .order_by(IntegrationRoute.id.asc()) + .all() + ) + self.assertEqual(len(routes), 2) + self.assertFalse(routes[0].enabled) + self.assertTrue(routes[1].enabled) + self.assertEqual(routes[1].recipient_email, integration_service.DYNAMIC_USER_ROUTE_EMAIL) + finally: + db.close() + async def test_process_pending_deliveries_marks_failure_when_provider_fails(self): SessionLocal = self._build_session_local() @@ -131,6 +350,131 @@ class IntegrationServiceTests(unittest.IsolatedAsyncioTestCase): self.assertEqual(deliveries[0]["status"], "failed") self.assertIn("brevo offline", deliveries[0]["last_error"]) + async def test_list_integration_deliveries_filters_by_status_and_event(self): + SessionLocal = self._build_session_local() + + with patch.object(integration_service, "SessionMockLocal", SessionLocal), patch.object( + integration_service.settings, + "integrations_enabled", + True, + ), patch.object( + integration_service.settings, + "integration_sync_delivery_enabled", + False, + ): + integration_service.upsert_email_integration_route( + event_type=ORDER_CREATED_EVENT, + recipient_email="ops@example.com", + ) + integration_service.upsert_email_integration_route( + event_type=REVIEW_SCHEDULED_EVENT, + recipient_email="review@example.com", + ) + await integration_service.emit_business_event(ORDER_CREATED_EVENT, {"numero_pedido": "PED-1"}) + await integration_service.emit_business_event(REVIEW_SCHEDULED_EVENT, {"protocolo": "REV-1"}) + + db = SessionLocal() + try: + order_delivery = ( + db.query(IntegrationDelivery) + .filter(IntegrationDelivery.event_type == ORDER_CREATED_EVENT) + .one() + ) + order_delivery.status = "failed" + db.commit() + finally: + db.close() + + with patch.object(integration_service, "SessionMockLocal", SessionLocal): + deliveries = integration_service.list_integration_deliveries( + statuses=["failed"], + event_type=ORDER_CREATED_EVENT, + limit=10, + ) + + self.assertEqual(len(deliveries), 1) + self.assertEqual(deliveries[0]["event_type"], ORDER_CREATED_EVENT) + self.assertEqual(deliveries[0]["status"], "failed") + self.assertEqual(deliveries[0]["recipient_email"], "ops@example.com") + + async def test_process_pending_deliveries_respects_status_and_event_filters(self): + SessionLocal = self._build_session_local() + + with patch.object(integration_service, "SessionMockLocal", SessionLocal), patch.object( + integration_service.settings, + "integrations_enabled", + True, + ), patch.object( + integration_service.settings, + "integration_sync_delivery_enabled", + False, + ): + integration_service.upsert_email_integration_route( + event_type=ORDER_CREATED_EVENT, + recipient_email="ops@example.com", + ) + integration_service.upsert_email_integration_route( + event_type=REVIEW_SCHEDULED_EVENT, + recipient_email="review@example.com", + ) + await integration_service.emit_business_event(ORDER_CREATED_EVENT, {"numero_pedido": "PED-1"}) + await integration_service.emit_business_event(REVIEW_SCHEDULED_EVENT, {"protocolo": "REV-1"}) + + fake_provider = type( + "FakeProvider", + (), + { + "send_email": AsyncMock(return_value={"message_id": "brevo-filtered"}), + }, + )() + + with patch.object(integration_service, "SessionMockLocal", SessionLocal), patch.object( + integration_service, + "_get_provider", + return_value=fake_provider, + ): + deliveries = await integration_service.process_pending_deliveries( + statuses=["pending"], + event_type=REVIEW_SCHEDULED_EVENT, + limit=10, + ) + + self.assertEqual(len(deliveries), 1) + self.assertEqual(deliveries[0]["event_type"], REVIEW_SCHEDULED_EVENT) + self.assertEqual(deliveries[0]["status"], "sent") + + db = SessionLocal() + try: + rows = ( + db.query(IntegrationDelivery) + .order_by(IntegrationDelivery.event_type.asc(), IntegrationDelivery.id.asc()) + .all() + ) + self.assertEqual(rows[0].status, "pending") + self.assertEqual(rows[1].status, "sent") + finally: + db.close() + + def test_list_integration_routes_filters_disabled_routes(self): + SessionLocal = self._build_session_local() + + with patch.object(integration_service, "SessionMockLocal", SessionLocal): + integration_service.upsert_email_integration_route( + event_type=ORDER_CREATED_EVENT, + recipient_email="ops@example.com", + enabled=True, + ) + integration_service.upsert_email_integration_route( + event_type=REVIEW_SCHEDULED_EVENT, + recipient_email="review@example.com", + enabled=False, + ) + routes = integration_service.list_integration_routes(enabled=False) + + self.assertEqual(len(routes), 1) + self.assertEqual(routes[0]["event_type"], REVIEW_SCHEDULED_EVENT) + self.assertFalse(routes[0]["enabled"]) + if __name__ == "__main__": unittest.main() diff --git a/tests/test_turn_decision_contract.py b/tests/test_turn_decision_contract.py index 29c2b52..6b654f2 100644 --- a/tests/test_turn_decision_contract.py +++ b/tests/test_turn_decision_contract.py @@ -1,7 +1,7 @@ import os import unittest from types import SimpleNamespace -from unittest.mock import patch +from unittest.mock import AsyncMock, patch from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker @@ -13,6 +13,7 @@ from datetime import datetime, timedelta from app.core.time_utils import utc_now from app.db.mock_database import MockBase from app.db.mock_models import RentalContract, RentalPayment, RentalVehicle +from app.services.integrations.events import ORDER_CREATED_EVENT from app.services.orchestration.conversation_policy import ConversationPolicy from app.services.orchestration.entity_normalizer import EntityNormalizer @@ -1846,6 +1847,98 @@ class TurnDecisionContractTests(unittest.IsolatedAsyncioTestCase): self.assertTrue(prioritized) + def test_should_prioritize_review_flow_for_review_schedule_intent_without_prefilled_fields(self): + service = OrquestradorService.__new__(OrquestradorService) + service.state = FakeState() + service.normalizer = EntityNormalizer() + service._get_user_context = lambda user_id: None + + prioritized = service._should_prioritize_review_flow( + turn_decision={"intent": "review_schedule", "domain": "review", "action": "ask_missing_fields"}, + extracted_entities={ + "generic_memory": {}, + "review_fields": {}, + "review_management_fields": {}, + "order_fields": {}, + "cancel_order_fields": {}, + "intents": {}, + }, + user_id=1, + ) + + self.assertTrue(prioritized) + + async def test_review_schedule_direct_flow_captures_email_side_effects_after_success(self): + state = FakeState( + entries={ + "pending_review_drafts": { + 1: { + "payload": { + "placa": "ABC1D23", + "data_hora": "2026-03-25T10:00:00", + "modelo": "Onix", + "ano": 2022, + "km": 35000, + "revisao_previa_concessionaria": False, + }, + "expires_at": utc_now() + timedelta(minutes=15), + } + } + }, + contexts={ + 1: { + "active_domain": "review", + "active_task": "review_schedule", + "generic_memory": {}, + "shared_memory": {}, + "order_queue": [], + "pending_order_selection": None, + "pending_switch": None, + "last_stock_results": [], + "selected_vehicle": None, + "expires_at": utc_now() + timedelta(minutes=15), + } + }, + ) + service = OrquestradorService.__new__(OrquestradorService) + service.state = state + service.normalizer = EntityNormalizer() + service.tool_executor = FakeToolExecutor( + result={ + "protocolo": "REV-20260325-084279F3", + "placa": "ABC1D23", + "data_hora": "2026-03-25T10:00:00", + "modelo": "Onix", + "ano": 2022, + "km": 35000, + "valor_revisao": 906.0, + "status": "agendado", + "user_id": 1, + } + ) + service._get_user_context = lambda user_id: state.get_user_context(user_id) + service._save_user_context = lambda user_id, context: state.save_user_context(user_id, context) + service._try_prefill_review_fields_from_memory = lambda user_id, payload: None + service._store_last_review_package = lambda user_id, payload: None + service._log_review_flow_source = lambda **kwargs: None + service._fallback_format_tool_result = lambda tool_name, tool_result: "Revisao agendada com sucesso." + captured = [] + service._capture_successful_tool_side_effects = lambda **kwargs: captured.append(kwargs) + + response = await service._try_collect_and_schedule_review( + message="ok", + user_id=1, + extracted_fields={}, + intents={}, + turn_decision={"intent": "review_schedule", "domain": "review", "action": "collect_review_schedule"}, + ) + + self.assertEqual(response, "Revisao agendada com sucesso.") + self.assertEqual(len(captured), 1) + self.assertEqual(captured[0]["tool_name"], "agendar_revisao") + self.assertEqual(captured[0]["user_id"], 1) + self.assertEqual(captured[0]["tool_result"]["protocolo"], "REV-20260325-084279F3") + async def test_handle_message_prioritizes_review_management_over_model_answer_for_reschedule_intent(self): state = FakeState( contexts={ @@ -2604,7 +2697,8 @@ class TurnDecisionContractTests(unittest.IsolatedAsyncioTestCase): user_id=1, ) - self.assertEqual(response, "devolucao ok") + self.assertTrue(response.startswith("devolucao ok")) + self.assertIn("Se quiser, posso te enviar esse resumo por e-mail.", response) self.assertEqual( service.tool_executor.calls, [ @@ -2703,7 +2797,8 @@ class TurnDecisionContractTests(unittest.IsolatedAsyncioTestCase): user_id=1, ) - self.assertEqual(response, "pagamento ok") + self.assertTrue(response.startswith("pagamento ok")) + self.assertIn("Se quiser, posso te enviar esse resumo por e-mail.", response) self.assertEqual( service.tool_executor.calls, [ @@ -2809,7 +2904,8 @@ class TurnDecisionContractTests(unittest.IsolatedAsyncioTestCase): user_id=1, ) - self.assertEqual(response, "pagamento ok") + self.assertTrue(response.startswith("pagamento ok")) + self.assertIn("Se quiser, posso te enviar esse resumo por e-mail.", response) self.assertEqual( service.tool_executor.calls, [ @@ -4691,7 +4787,122 @@ class TurnDecisionContractTests(unittest.IsolatedAsyncioTestCase): self.assertIsNone(response) -if __name__ == "__main__": - unittest.main() +class OrquestradorEmailCaptureTests(unittest.IsolatedAsyncioTestCase): + def _build_service(self, state=None): + service = OrquestradorService.__new__(OrquestradorService) + service.state = state or FakeState() + service.normalizer = EntityNormalizer() + service._turn_trace = {"request_id": "req-1"} + return service + + def test_stage_email_capture_request_and_prompt_for_current_turn(self): + service = self._build_service() + service._get_saved_user_email = lambda user_id: None + + service._stage_email_capture_request( + tool_name="realizar_pedido", + tool_result={"numero_pedido": "PED-1"}, + user_id=7, + ) + + pending = service.state.get_entry("pending_email_capture_requests", 7) + self.assertIsNotNone(pending) + self.assertEqual(pending["event_type"], ORDER_CREATED_EVENT) + self.assertEqual(pending["payload"]["numero_pedido"], "PED-1") + self.assertEqual(pending["payload"]["user_id"], 7) + + response = service._append_email_capture_prompt_if_needed( + response="Pedido criado com sucesso.", + user_id=7, + ) + self.assertIn("Se quiser, posso te enviar esse resumo por e-mail.", response) + + async def test_pending_email_capture_decline_clears_request(self): + state = FakeState( + entries={ + "pending_email_capture_requests": { + 7: { + "request_id": "req-1", + "event_type": ORDER_CREATED_EVENT, + "payload": {"numero_pedido": "PED-1", "user_id": 7}, + "expires_at": utc_now() + timedelta(minutes=15), + } + } + } + ) + service = self._build_service(state=state) + + response = await service._try_handle_pending_email_capture_message( + message="prefiro nao informar", + user_id=7, + ) + + self.assertEqual(response, "Tudo bem. Nao vou enviar este resumo por e-mail.") + self.assertIsNone(state.get_entry("pending_email_capture_requests", 7)) + + def test_ensure_user_email_routes_syncs_global_routes_only_once(self): + service = self._build_service() + service._get_user_record = lambda user_id: SimpleNamespace( + id=user_id, + email="cliente@example.com", + name="Cliente Teste", + ) + + with patch( + "app.services.orchestration.orquestrador_service.sync_user_email_integration_routes" + ) as sync_routes_mock: + service._ensure_user_email_routes(user_id=7) + service._ensure_user_email_routes(user_id=7) + + sync_routes_mock.assert_called_once_with( + user_id=7, + recipient_email="cliente@example.com", + recipient_name="Cliente Teste", + ) + + async def test_pending_email_capture_success_saves_email_and_reemits_event(self): + state = FakeState( + entries={ + "pending_email_capture_requests": { + 7: { + "request_id": "req-1", + "event_type": ORDER_CREATED_EVENT, + "payload": {"numero_pedido": "PED-1", "user_id": 7}, + "expires_at": utc_now() + timedelta(minutes=15), + } + } + } + ) + service = self._build_service(state=state) + saved = {} + ensured_routes = [] + + def fake_save_user_email(user_id: int | None, email: str | None): + saved["user_id"] = user_id + saved["email"] = email + return SimpleNamespace(id=user_id, email=email, name="Cliente Teste") + + service._save_user_email = fake_save_user_email + service._ensure_user_email_routes = lambda user_id: ensured_routes.append(user_id) + + with patch( + "app.services.orchestration.orquestrador_service.emit_business_event", + new=AsyncMock(return_value=[{"status": "sent", "provider_message_id": "brevo-1"}]), + ) as emit_business_event_mock: + response = await service._try_handle_pending_email_capture_message( + message="cliente@example.com", + user_id=7, + ) + + self.assertEqual(saved, {"user_id": 7, "email": "cliente@example.com"}) + self.assertEqual(ensured_routes, [7]) + emit_business_event_mock.assert_awaited_once_with( + event_type=ORDER_CREATED_EVENT, + payload={"numero_pedido": "PED-1", "user_id": 7}, + ) + self.assertIn("enviei este resumo por la", response) + self.assertIsNone(state.get_entry("pending_email_capture_requests", 7)) +if __name__ == "__main__": + unittest.main()