diff --git a/.env.example b/.env.example
index ebae699..eb830ff 100644
--- a/.env.example
+++ b/.env.example
@@ -62,3 +62,16 @@ CONVERSATION_STATE_TTL_MINUTES=60
REDIS_URL=redis://127.0.0.1:6379/0
REDIS_KEY_PREFIX=orquestrador
REDIS_SOCKET_TIMEOUT_SECONDS=5
+
+# ============================================
+# INTEGRACOES EXTERNAS
+# ============================================
+
+INTEGRATIONS_ENABLED=false
+INTEGRATION_SYNC_DELIVERY_ENABLED=true
+
+BREVO_API_KEY=
+BREVO_BASE_URL=https://api.brevo.com/v3
+BREVO_SENDER_EMAIL=
+BREVO_SENDER_NAME=Orquestrador
+BREVO_REQUEST_TIMEOUT_SECONDS=10
diff --git a/README.md b/README.md
index 3ebe62d..9287c35 100644
--- a/README.md
+++ b/README.md
@@ -152,6 +152,10 @@ app/
mock_customer_service.py
user_service.py
scripts/
+ list_integration_deliveries.py
+ list_integration_routes.py
+ process_integration_deliveries.py
+ upsert_integration_route.py
stress_smoke.py
tests/
...
@@ -305,6 +309,16 @@ Principais variaveis:
- `REDIS_KEY_PREFIX`
- `REDIS_SOCKET_TIMEOUT_SECONDS`
+### Integracoes externas
+
+- `INTEGRATIONS_ENABLED`
+- `INTEGRATION_SYNC_DELIVERY_ENABLED`
+- `BREVO_API_KEY`
+- `BREVO_BASE_URL`
+- `BREVO_SENDER_EMAIL`
+- `BREVO_SENDER_NAME`
+- `BREVO_REQUEST_TIMEOUT_SECONDS`
+
### Ambiente
- `ENVIRONMENT`
@@ -357,6 +371,24 @@ Se voce estiver usando um arquivo de ambiente dedicado:
python -m dotenv -f .env.local run -- python scripts/stress_smoke.py --backend memory --state-iterations 200 --order-cycles 30 --race-attempts 8 --user-base 995000 --cpf 11144477735
```
+Operacao basica do outbox de integracoes:
+
+```bash
+python scripts/upsert_integration_route.py --event order.created --recipient ops@empresa.com
+python scripts/list_integration_routes.py --enabled
+python scripts/list_integration_deliveries.py --status failed --limit 20
+python scripts/process_integration_deliveries.py --status failed --limit 20
+```
+
+Exemplo de configuracao mais completa da rota Brevo:
+
+```bash
+python scripts/upsert_integration_route.py --event order.created --recipient ops@empresa.com --sender-email noreply@empresa.com --sender-name Operacoes --reply-to-email atendimento@empresa.com --reply-to-name Atendimento --cc financeiro@empresa.com --tag pedidos --tag operacao --header X-Canal=orquestrador
+```
+
+Campos aceitos no `provider_config` da rota: `sender`, `reply_to`, `cc`, `bcc`, `tags`, `headers` e `html_content`.
+Para casos mais avancados, o script tambem aceita `--provider-config-json` com um objeto JSON.
+
## Deploy
O deploy de servidor fica documentado em [DEPLOY_SERVIDOR.md](DEPLOY_SERVIDOR.md).
diff --git a/app/db/bootstrap.py b/app/db/bootstrap.py
index 14785e1..d4dad3a 100644
--- a/app/db/bootstrap.py
+++ b/app/db/bootstrap.py
@@ -3,6 +3,8 @@ Rotina dedicada de bootstrap de banco de dados.
Cria tabelas e executa seed inicial de forma explicita, fora do startup do app.
"""
+from sqlalchemy import inspect, text
+
from app.core.settings import settings
from app.db.database import Base, engine
from app.db.mock_database import MockBase, mock_engine
@@ -24,6 +26,28 @@ from app.db.mock_seed import seed_mock_data
from app.db.tool_seed import seed_tools
+def _ensure_mock_schema_evolution() -> None:
+ inspector = inspect(mock_engine)
+ table_names = set(inspector.get_table_names())
+ if "users" in table_names:
+ user_columns = {column["name"] for column in inspector.get_columns("users")}
+ if "email" not in user_columns:
+ with mock_engine.begin() as connection:
+ connection.execute(text("ALTER TABLE users ADD COLUMN email VARCHAR(255)"))
+
+ if "integration_deliveries" in table_names:
+ delivery_columns = {column["name"] for column in inspector.get_columns("integration_deliveries")}
+ statements: list[str] = []
+ if "recipient_email" not in delivery_columns:
+ statements.append("ALTER TABLE integration_deliveries ADD COLUMN recipient_email VARCHAR(255)")
+ if "recipient_name" not in delivery_columns:
+ statements.append("ALTER TABLE integration_deliveries ADD COLUMN recipient_name VARCHAR(120)")
+ if statements:
+ with mock_engine.begin() as connection:
+ for statement in statements:
+ connection.execute(text(statement))
+
+
def bootstrap_databases(
*,
run_tools_seed: bool | None = None,
@@ -56,6 +80,7 @@ def bootstrap_databases(
try:
print("Criando tabelas MySQL (dados ficticios)...")
MockBase.metadata.create_all(bind=mock_engine)
+ _ensure_mock_schema_evolution()
if should_seed_mock:
print("Populando dados ficticios iniciais...")
seed_mock_data()
diff --git a/app/db/mock_models.py b/app/db/mock_models.py
index 7fc6831..05b4853 100644
--- a/app/db/mock_models.py
+++ b/app/db/mock_models.py
@@ -40,6 +40,7 @@ class User(MockBase):
username = Column(String(120), nullable=True)
cpf = Column(String(11), ForeignKey("customers.cpf"), nullable=True, index=True)
phone = Column(String(30), nullable=True)
+ email = Column(String(255), nullable=True, index=True)
created_at = Column(DateTime, server_default=func.current_timestamp())
updated_at = Column(
DateTime,
@@ -194,6 +195,8 @@ class IntegrationDelivery(MockBase):
provider = Column(String(40), nullable=False, index=True)
status = Column(String(20), nullable=False, default="pending", index=True)
payload_json = Column(Text, nullable=False)
+ recipient_email = Column(String(255), nullable=True, index=True)
+ recipient_name = Column(String(120), nullable=True)
rendered_subject = Column(Text, nullable=True)
rendered_body = Column(Text, nullable=True)
provider_message_id = Column(String(120), nullable=True, index=True)
diff --git a/app/repositories/user_repository.py b/app/repositories/user_repository.py
index a7ea49a..d9ee532 100644
--- a/app/repositories/user_repository.py
+++ b/app/repositories/user_repository.py
@@ -19,6 +19,12 @@ class UserRepository:
.first()
)
+ def get_by_id(self, user_id: int | None):
+ """Busca usuario pelo identificador interno."""
+ if user_id is None:
+ return None
+ return self.db.query(User).filter(User.id == user_id).first()
+
def create(
self,
channel: str,
@@ -58,3 +64,18 @@ class UserRepository:
self.db.refresh(user)
return user
+
+ def update_email(
+ self,
+ user: User,
+ email: str | None,
+ ):
+ """Atualiza email persistido do usuario."""
+ normalized_email = str(email or "").strip().lower() or None
+ if normalized_email == user.email:
+ return user
+
+ user.email = normalized_email
+ self.db.commit()
+ self.db.refresh(user)
+ return user
diff --git a/app/services/domain/rental_service.py b/app/services/domain/rental_service.py
index 1562dcd..1c5cfe7 100644
--- a/app/services/domain/rental_service.py
+++ b/app/services/domain/rental_service.py
@@ -37,7 +37,7 @@ def _parse_optional_datetime(value: str | None, *, field_name: str) -> datetime
if not text:
return None
- normalized = re.sub(r"\s+(?:as|às)\s+", " ", text, flags=re.IGNORECASE)
+ normalized = re.sub(r"\s+(?:as|Ã s)\s+", " ", text, flags=re.IGNORECASE)
for candidate in (text, normalized):
try:
return datetime.fromisoformat(candidate.replace("Z", "+00:00"))
@@ -408,6 +408,7 @@ async def abrir_locacao_aluguel(
"status_veiculo": vehicle.status,
"cpf": contract.cpf,
"nome_cliente": _normalize_text_field(nome_cliente),
+ "user_id": contract.user_id,
}
await publish_business_event_safely(RENTAL_OPENED_EVENT, result)
return result
@@ -475,6 +476,7 @@ async def registrar_devolucao_aluguel(
"valor_final": float(contract.valor_final) if contract.valor_final is not None else None,
"status": contract.status,
"status_veiculo": vehicle.status if vehicle is not None else None,
+ "user_id": contract.user_id,
}
await publish_business_event_safely(RENTAL_RETURN_REGISTERED_EVENT, result)
return result
@@ -549,6 +551,7 @@ async def registrar_pagamento_aluguel(
"favorecido": record.favorecido,
"identificador_comprovante": record.identificador_comprovante,
"status": "registrado",
+ "user_id": record.user_id,
}
await publish_business_event_safely(RENTAL_PAYMENT_REGISTERED_EVENT, result)
return result
diff --git a/app/services/flows/order_flow.py b/app/services/flows/order_flow.py
index 237ae35..ecd07cd 100644
--- a/app/services/flows/order_flow.py
+++ b/app/services/flows/order_flow.py
@@ -970,6 +970,16 @@ class OrderFlowMixin:
active_task="order_create",
)
self._reset_order_stock_context(user_id=user_id)
+ if hasattr(self, "_capture_successful_tool_side_effects"):
+ self._capture_successful_tool_side_effects(
+ tool_name="realizar_pedido",
+ arguments={
+ "cpf": draft["payload"]["cpf"],
+ "vehicle_id": draft["payload"]["vehicle_id"],
+ },
+ tool_result=tool_result,
+ user_id=user_id,
+ )
return self._fallback_format_tool_result("realizar_pedido", tool_result)
@@ -1090,6 +1100,13 @@ class OrderFlowMixin:
"order_cancel",
active_task="order_cancel",
)
+ if hasattr(self, "_capture_successful_tool_side_effects"):
+ self._capture_successful_tool_side_effects(
+ tool_name="cancelar_pedido",
+ arguments=draft["payload"],
+ tool_result=tool_result,
+ user_id=user_id,
+ )
return self._fallback_format_tool_result("cancelar_pedido", tool_result)
diff --git a/app/services/flows/rental_flow.py b/app/services/flows/rental_flow.py
index 4fa0122..ea92363 100644
--- a/app/services/flows/rental_flow.py
+++ b/app/services/flows/rental_flow.py
@@ -612,5 +612,18 @@ class RentalFlowMixin:
self._store_last_rental_contract(user_id=user_id, payload=tool_result)
self._reset_pending_rental_states(user_id=user_id)
+ if hasattr(self, "_capture_successful_tool_side_effects"):
+ self._capture_successful_tool_side_effects(
+ tool_name="abrir_locacao_aluguel",
+ arguments={
+ "rental_vehicle_id": draft_payload["rental_vehicle_id"],
+ "placa": draft_payload.get("placa"),
+ "data_inicio": draft_payload["data_inicio"],
+ "data_fim_prevista": draft_payload["data_fim_prevista"],
+ "cpf": draft_payload.get("cpf"),
+ },
+ tool_result=tool_result,
+ user_id=user_id,
+ )
return self._fallback_format_tool_result("abrir_locacao_aluguel", tool_result)
diff --git a/app/services/flows/review_flow.py b/app/services/flows/review_flow.py
index 5f876f6..f02f542 100644
--- a/app/services/flows/review_flow.py
+++ b/app/services/flows/review_flow.py
@@ -899,5 +899,12 @@ class ReviewFlowMixin:
)
self._store_last_review_package(user_id=user_id, payload=draft["payload"])
self._log_review_flow_source(source=review_flow_source or "draft", payload=draft["payload"])
+ if hasattr(self, "_capture_successful_tool_side_effects"):
+ self._capture_successful_tool_side_effects(
+ tool_name="agendar_revisao",
+ arguments=draft["payload"],
+ tool_result=tool_result,
+ user_id=user_id,
+ )
return self._fallback_format_tool_result("agendar_revisao", tool_result)
diff --git a/app/services/integrations/__init__.py b/app/services/integrations/__init__.py
index ad1530a..42f9da1 100644
--- a/app/services/integrations/__init__.py
+++ b/app/services/integrations/__init__.py
@@ -1,4 +1,4 @@
-from app.services.integrations.events import (
+from app.services.integrations.events import (
ORDER_CANCELLED_EVENT,
ORDER_CREATED_EVENT,
RENTAL_OPENED_EVENT,
@@ -8,10 +8,13 @@ from app.services.integrations.events import (
SUPPORTED_EVENT_TYPES,
)
from app.services.integrations.service import (
+ SUPPORTED_DELIVERY_STATUSES,
emit_business_event,
+ list_integration_deliveries,
list_integration_routes,
process_pending_deliveries,
publish_business_event_safely,
+ sync_user_email_integration_routes,
upsert_email_integration_route,
)
@@ -23,9 +26,12 @@ __all__ = [
"RENTAL_RETURN_REGISTERED_EVENT",
"REVIEW_SCHEDULED_EVENT",
"SUPPORTED_EVENT_TYPES",
+ "SUPPORTED_DELIVERY_STATUSES",
"emit_business_event",
+ "list_integration_deliveries",
"list_integration_routes",
"process_pending_deliveries",
"publish_business_event_safely",
+ "sync_user_email_integration_routes",
"upsert_email_integration_route",
]
diff --git a/app/services/integrations/providers.py b/app/services/integrations/providers.py
index 8dd0f51..8c07491 100644
--- a/app/services/integrations/providers.py
+++ b/app/services/integrations/providers.py
@@ -1,3 +1,6 @@
+from collections.abc import Mapping
+from typing import Any
+
import httpx
from app.core.settings import settings
@@ -7,6 +10,67 @@ class IntegrationProviderError(RuntimeError):
"""Erro de transporte ou configuracao em providers externos."""
+def _clean_text(value: Any) -> str | None:
+ text = str(value or "").strip()
+ return text or None
+
+
+def _normalize_address(value: Any, *, fallback_name: Any = None) -> dict[str, str] | None:
+ if isinstance(value, Mapping):
+ email = _clean_text(value.get("email"))
+ name = _clean_text(value.get("name"))
+ else:
+ email = _clean_text(value)
+ name = _clean_text(fallback_name)
+
+ if not email:
+ return None
+
+ address = {"email": email}
+ if name:
+ address["name"] = name
+ return address
+
+
+def _normalize_address_list(value: Any) -> list[dict[str, str]]:
+ if value is None:
+ return []
+
+ candidates = value if isinstance(value, (list, tuple, set)) else [value]
+ addresses: list[dict[str, str]] = []
+ for candidate in candidates:
+ address = _normalize_address(candidate)
+ if address:
+ addresses.append(address)
+ return addresses
+
+
+def _normalize_tags(*groups: Any) -> list[str]:
+ merged: list[str] = []
+ for group in groups:
+ if group is None:
+ continue
+ candidates = group if isinstance(group, (list, tuple, set)) else [group]
+ for candidate in candidates:
+ tag = _clean_text(candidate)
+ if tag and tag not in merged:
+ merged.append(tag)
+ return merged
+
+
+def _normalize_headers(value: Any) -> dict[str, str]:
+ if not isinstance(value, Mapping):
+ return {}
+
+ headers: dict[str, str] = {}
+ for key, header_value in value.items():
+ normalized_key = _clean_text(key)
+ normalized_value = _clean_text(header_value)
+ if normalized_key and normalized_value:
+ headers[normalized_key] = normalized_value
+ return headers
+
+
class BrevoEmailProvider:
provider_name = "brevo_email"
@@ -17,8 +81,30 @@ class BrevoEmailProvider:
self.sender_name = str(settings.brevo_sender_name or "Orquestrador").strip() or "Orquestrador"
self.timeout_seconds = max(1, int(settings.brevo_request_timeout_seconds or 10))
- def is_configured(self) -> bool:
- return bool(self.api_key and self.sender_email)
+ def _normalize_provider_config(self, provider_config: Mapping[str, Any] | None) -> dict[str, Any]:
+ if not isinstance(provider_config, Mapping):
+ return {}
+ return dict(provider_config)
+
+ def _resolve_sender(self, provider_config: Mapping[str, Any]) -> tuple[str | None, str]:
+ sender = provider_config.get("sender")
+ sender_payload = _normalize_address(sender) if sender is not None else None
+ sender_email = (
+ (sender_payload or {}).get("email")
+ or _clean_text(provider_config.get("sender_email"))
+ or self.sender_email
+ )
+ sender_name = (
+ (sender_payload or {}).get("name")
+ or _clean_text(provider_config.get("sender_name"))
+ or self.sender_name
+ )
+ return sender_email, sender_name or "Orquestrador"
+
+ def is_configured(self, provider_config: Mapping[str, Any] | None = None) -> bool:
+ normalized_provider_config = self._normalize_provider_config(provider_config)
+ sender_email, _sender_name = self._resolve_sender(normalized_provider_config)
+ return bool(self.api_key and sender_email)
async def send_email(
self,
@@ -28,16 +114,19 @@ class BrevoEmailProvider:
subject: str,
body: str,
tags: list[str] | None = None,
+ provider_config: Mapping[str, Any] | None = None,
) -> dict:
- if not self.is_configured():
+ normalized_provider_config = self._normalize_provider_config(provider_config)
+ sender_email, sender_name = self._resolve_sender(normalized_provider_config)
+ if not self.is_configured(normalized_provider_config):
raise IntegrationProviderError(
"Brevo nao configurado. Defina BREVO_API_KEY e BREVO_SENDER_EMAIL para enviar emails."
)
payload = {
"sender": {
- "email": self.sender_email,
- "name": self.sender_name,
+ "email": sender_email,
+ "name": sender_name,
},
"to": [
{
@@ -48,8 +137,33 @@ class BrevoEmailProvider:
"subject": str(subject or "").strip(),
"textContent": str(body or "").strip(),
}
- if tags:
- payload["tags"] = [str(tag).strip() for tag in tags if str(tag).strip()]
+
+ reply_to = _normalize_address(
+ normalized_provider_config.get("reply_to") or normalized_provider_config.get("reply_to_email"),
+ fallback_name=normalized_provider_config.get("reply_to_name"),
+ )
+ if reply_to:
+ payload["replyTo"] = reply_to
+
+ cc = _normalize_address_list(normalized_provider_config.get("cc"))
+ if cc:
+ payload["cc"] = cc
+
+ bcc = _normalize_address_list(normalized_provider_config.get("bcc"))
+ if bcc:
+ payload["bcc"] = bcc
+
+ merged_tags = _normalize_tags(tags, normalized_provider_config.get("tags"))
+ if merged_tags:
+ payload["tags"] = merged_tags
+
+ headers = _normalize_headers(normalized_provider_config.get("headers"))
+ if headers:
+ payload["headers"] = headers
+
+ html_content = _clean_text(normalized_provider_config.get("html_content"))
+ if html_content:
+ payload["htmlContent"] = html_content
headers = {
"accept": "application/json",
diff --git a/app/services/integrations/service.py b/app/services/integrations/service.py
index d10112a..52090c3 100644
--- a/app/services/integrations/service.py
+++ b/app/services/integrations/service.py
@@ -3,13 +3,20 @@ import json
import logging
from typing import Any
-from sqlalchemy import or_
-
from app.core.settings import settings
from app.core.time_utils import utc_now
from app.db.mock_database import SessionMockLocal
from app.db.mock_models import IntegrationDelivery, IntegrationRoute
-from app.services.integrations.events import SUPPORTED_EVENT_TYPES
+from app.repositories.user_repository import UserRepository
+from app.services.integrations.events import (
+ ORDER_CANCELLED_EVENT,
+ ORDER_CREATED_EVENT,
+ RENTAL_OPENED_EVENT,
+ RENTAL_PAYMENT_REGISTERED_EVENT,
+ RENTAL_RETURN_REGISTERED_EVENT,
+ REVIEW_SCHEDULED_EVENT,
+ SUPPORTED_EVENT_TYPES,
+)
from app.services.integrations.providers import BrevoEmailProvider, IntegrationProviderError
from app.services.integrations.templates import render_email_content
@@ -19,6 +26,23 @@ logger = logging.getLogger(__name__)
_PROVIDER_FACTORIES = {
"brevo_email": BrevoEmailProvider,
}
+SUPPORTED_DELIVERY_STATUSES = (
+ "pending",
+ "failed",
+ "sent",
+ "skipped",
+)
+USER_PROFILE_ROUTE_SCOPE = "user_profile"
+USER_PROFILE_EVENT_TYPES = (
+ ORDER_CREATED_EVENT,
+ ORDER_CANCELLED_EVENT,
+ REVIEW_SCHEDULED_EVENT,
+ RENTAL_OPENED_EVENT,
+ RENTAL_PAYMENT_REGISTERED_EVENT,
+ RENTAL_RETURN_REGISTERED_EVENT,
+)
+DYNAMIC_USER_ROUTE_EMAIL = "dynamic:user_profile"
+DYNAMIC_USER_ROUTE_NAME = "Usuario do fluxo"
def _clean_text(value: str | None) -> str | None:
@@ -48,12 +72,52 @@ def _validate_event_type(event_type: str) -> str:
return normalized
+def _normalize_int(value: Any) -> int | None:
+ if value is None:
+ return None
+ text = str(value).strip()
+ if not text:
+ return None
+ try:
+ return int(text)
+ except (TypeError, ValueError):
+ return None
+
+
+def _normalize_delivery_statuses(statuses: str | list[str] | tuple[str, ...] | None) -> list[str] | None:
+ if statuses is None:
+ return None
+
+ if isinstance(statuses, str):
+ candidates = [statuses]
+ else:
+ candidates = list(statuses)
+
+ normalized_statuses: list[str] = []
+ for status in candidates:
+ normalized = _clean_text(status)
+ if not normalized:
+ continue
+ if normalized not in SUPPORTED_DELIVERY_STATUSES:
+ raise ValueError(f"unsupported integration delivery status: {status}")
+ if normalized not in normalized_statuses:
+ normalized_statuses.append(normalized)
+
+ return normalized_statuses or None
+
+
def _build_idempotency_key(*, route_id: int, event_type: str, payload: dict[str, Any]) -> str:
raw = f"{route_id}:{event_type}:{_serialize_json(payload)}"
return hashlib.sha256(raw.encode("utf-8")).hexdigest()
+def _recipient_scope(provider_config: dict[str, Any] | None) -> str:
+ scope = str((provider_config or {}).get("recipient_scope") or "").strip().lower()
+ return scope or "fixed"
+
+
def _serialize_route(route: IntegrationRoute) -> dict[str, Any]:
+ provider_config = _deserialize_json(route.provider_config_json)
return {
"id": route.id,
"event_type": route.event_type,
@@ -61,12 +125,74 @@ def _serialize_route(route: IntegrationRoute) -> dict[str, Any]:
"enabled": bool(route.enabled),
"recipient_email": route.recipient_email,
"recipient_name": route.recipient_name,
+ "recipient_scope": _recipient_scope(provider_config),
"subject_template": route.subject_template,
"body_template": route.body_template,
- "provider_config": _deserialize_json(route.provider_config_json),
+ "provider_config": provider_config,
+ "created_at": route.created_at.isoformat() if route.created_at else None,
+ "updated_at": route.updated_at.isoformat() if route.updated_at else None,
}
+def _is_user_profile_route_config(provider_config: dict[str, Any] | None) -> bool:
+ return _recipient_scope(provider_config) == USER_PROFILE_ROUTE_SCOPE
+
+
+def _is_legacy_user_profile_route_config(provider_config: dict[str, Any] | None) -> bool:
+ return _is_user_profile_route_config(provider_config) and _normalize_int((provider_config or {}).get("user_id")) is not None
+
+
+def _resolve_user_profile_recipient(
+ db,
+ *,
+ payload: dict[str, Any],
+ route: IntegrationRoute,
+) -> dict[str, str] | None:
+ user_id = _normalize_int(payload.get("user_id"))
+ if user_id is None:
+ return None
+
+ user = UserRepository(db).get_by_id(user_id=user_id)
+ if user is None:
+ return None
+
+ email = _clean_text(getattr(user, "email", None))
+ if not email:
+ return None
+
+ recipient = {"email": email}
+ recipient_name = _clean_text(getattr(user, "name", None)) or _clean_text(route.recipient_name)
+ if recipient_name:
+ recipient["name"] = recipient_name
+ return recipient
+
+
+def _resolve_route_recipient(
+ db,
+ *,
+ route: IntegrationRoute,
+ payload: dict[str, Any],
+ provider_config: dict[str, Any] | None = None,
+) -> dict[str, str] | None:
+ normalized_provider_config = provider_config if isinstance(provider_config, dict) else _deserialize_json(route.provider_config_json)
+ if _is_user_profile_route_config(normalized_provider_config):
+ return _resolve_user_profile_recipient(
+ db,
+ payload=payload,
+ route=route,
+ )
+
+ email = _clean_text(route.recipient_email)
+ if not email:
+ return None
+
+ recipient = {"email": email}
+ recipient_name = _clean_text(route.recipient_name)
+ if recipient_name:
+ recipient["name"] = recipient_name
+ return recipient
+
+
def _serialize_delivery(delivery: IntegrationDelivery) -> dict[str, Any]:
return {
"id": delivery.id,
@@ -76,6 +202,8 @@ def _serialize_delivery(delivery: IntegrationDelivery) -> dict[str, Any]:
"status": delivery.status,
"attempts": int(delivery.attempts or 0),
"payload": _deserialize_json(delivery.payload_json),
+ "recipient_email": delivery.recipient_email,
+ "recipient_name": delivery.recipient_name,
"rendered_subject": delivery.rendered_subject,
"rendered_body": delivery.rendered_body,
"provider_message_id": delivery.provider_message_id,
@@ -83,6 +211,7 @@ def _serialize_delivery(delivery: IntegrationDelivery) -> dict[str, Any]:
"idempotency_key": delivery.idempotency_key,
"dispatched_at": delivery.dispatched_at.isoformat() if delivery.dispatched_at else None,
"created_at": delivery.created_at.isoformat() if delivery.created_at else None,
+ "updated_at": delivery.updated_at.isoformat() if delivery.updated_at else None,
}
@@ -93,22 +222,64 @@ def _get_provider(provider_name: str):
return factory()
-def list_integration_routes(*, event_type: str | None = None, provider: str | None = None) -> list[dict[str, Any]]:
+def list_integration_routes(
+ *,
+ event_type: str | None = None,
+ provider: str | None = None,
+ enabled: bool | None = None,
+) -> list[dict[str, Any]]:
db = SessionMockLocal()
try:
query = db.query(IntegrationRoute)
- normalized_event_type = _clean_text(event_type)
+ normalized_event_type = _validate_event_type(event_type) if _clean_text(event_type) else None
if normalized_event_type:
query = query.filter(IntegrationRoute.event_type == normalized_event_type)
normalized_provider = _clean_text(provider)
if normalized_provider:
query = query.filter(IntegrationRoute.provider == normalized_provider)
+ if enabled is not None:
+ query = query.filter(IntegrationRoute.enabled.is_(bool(enabled)))
routes = query.order_by(IntegrationRoute.event_type.asc(), IntegrationRoute.id.asc()).all()
return [_serialize_route(route) for route in routes]
finally:
db.close()
+def list_integration_deliveries(
+ *,
+ statuses: str | list[str] | tuple[str, ...] | None = None,
+ event_type: str | None = None,
+ provider: str | None = None,
+ route_id: int | None = None,
+ limit: int = 50,
+) -> list[dict[str, Any]]:
+ normalized_statuses = _normalize_delivery_statuses(statuses)
+ normalized_event_type = _validate_event_type(event_type) if _clean_text(event_type) else None
+ normalized_provider = _clean_text(provider)
+ normalized_route_id = int(route_id) if route_id is not None else None
+ normalized_limit = max(1, int(limit)) if limit and int(limit) > 0 else None
+
+ db = SessionMockLocal()
+ try:
+ query = db.query(IntegrationDelivery)
+ if normalized_statuses:
+ query = query.filter(IntegrationDelivery.status.in_(normalized_statuses))
+ if normalized_event_type:
+ query = query.filter(IntegrationDelivery.event_type == normalized_event_type)
+ if normalized_provider:
+ query = query.filter(IntegrationDelivery.provider == normalized_provider)
+ if normalized_route_id is not None:
+ query = query.filter(IntegrationDelivery.route_id == normalized_route_id)
+
+ query = query.order_by(IntegrationDelivery.id.desc())
+ if normalized_limit is not None:
+ query = query.limit(normalized_limit)
+ rows = query.all()
+ return [_serialize_delivery(row) for row in rows]
+ finally:
+ db.close()
+
+
def upsert_email_integration_route(
*,
event_type: str,
@@ -122,7 +293,10 @@ def upsert_email_integration_route(
) -> dict[str, Any]:
normalized_event_type = _validate_event_type(event_type)
normalized_provider = _clean_text(provider) or "brevo_email"
+ normalized_provider_config = dict(provider_config or {})
normalized_email = _clean_text(recipient_email)
+ if not normalized_email and _is_user_profile_route_config(normalized_provider_config):
+ normalized_email = DYNAMIC_USER_ROUTE_EMAIL
if not normalized_email:
raise ValueError("recipient_email is required")
@@ -147,7 +321,7 @@ def upsert_email_integration_route(
route.recipient_name = _clean_text(recipient_name)
route.subject_template = _clean_text(subject_template)
route.body_template = _clean_text(body_template)
- route.provider_config_json = _serialize_json(provider_config)
+ route.provider_config_json = _serialize_json(normalized_provider_config)
db.commit()
db.refresh(route)
return _serialize_route(route)
@@ -155,6 +329,56 @@ def upsert_email_integration_route(
db.close()
+def sync_user_email_integration_routes(
+ *,
+ user_id: int,
+ recipient_email: str,
+ recipient_name: str | None = None,
+) -> list[dict[str, Any]]:
+ normalized_user_id = _normalize_int(user_id)
+ normalized_email = _clean_text(recipient_email)
+ if normalized_user_id is None:
+ raise ValueError("user_id is required")
+ if not normalized_email:
+ raise ValueError("recipient_email is required")
+
+ dynamic_provider_config = {
+ "recipient_scope": USER_PROFILE_ROUTE_SCOPE,
+ }
+
+ db = SessionMockLocal()
+ try:
+ routes = (
+ db.query(IntegrationRoute)
+ .filter(IntegrationRoute.provider == "brevo_email")
+ .filter(IntegrationRoute.event_type.in_(USER_PROFILE_EVENT_TYPES))
+ .all()
+ )
+ changed = False
+ for route in routes:
+ provider_config = _deserialize_json(route.provider_config_json)
+ if _is_legacy_user_profile_route_config(provider_config) and route.enabled:
+ route.enabled = False
+ changed = True
+ if changed:
+ db.commit()
+ finally:
+ db.close()
+
+ synced_routes: list[dict[str, Any]] = []
+ for event_type in USER_PROFILE_EVENT_TYPES:
+ synced_routes.append(
+ upsert_email_integration_route(
+ event_type=event_type,
+ recipient_email=DYNAMIC_USER_ROUTE_EMAIL,
+ recipient_name=DYNAMIC_USER_ROUTE_NAME,
+ enabled=True,
+ provider_config=dynamic_provider_config,
+ )
+ )
+ return synced_routes
+
+
async def emit_business_event(event_type: str, payload: dict[str, Any] | None) -> list[dict[str, Any]]:
if not settings.integrations_enabled:
return []
@@ -172,6 +396,16 @@ async def emit_business_event(event_type: str, payload: dict[str, Any] | None) -
.all()
)
for route in routes:
+ route_provider_config = _deserialize_json(route.provider_config_json)
+ recipient = _resolve_route_recipient(
+ db,
+ route=route,
+ payload=normalized_payload,
+ provider_config=route_provider_config,
+ )
+ if recipient is None:
+ continue
+
idempotency_key = _build_idempotency_key(
route_id=route.id,
event_type=normalized_event_type,
@@ -192,6 +426,8 @@ async def emit_business_event(event_type: str, payload: dict[str, Any] | None) -
provider=route.provider,
status="pending",
payload_json=_serialize_json(normalized_payload),
+ recipient_email=recipient.get("email"),
+ recipient_name=recipient.get("name"),
idempotency_key=idempotency_key,
)
db.add(delivery)
@@ -245,6 +481,7 @@ async def dispatch_delivery(delivery_id: int) -> dict[str, Any] | None:
return _serialize_delivery(delivery)
payload = _deserialize_json(delivery.payload_json)
+ route_provider_config = _deserialize_json(route.provider_config_json)
subject, body = render_email_content(
event_type=delivery.event_type,
payload=payload,
@@ -255,14 +492,35 @@ async def dispatch_delivery(delivery_id: int) -> dict[str, Any] | None:
delivery.rendered_body = body
delivery.attempts = int(delivery.attempts or 0) + 1
+ recipient_email = _clean_text(delivery.recipient_email)
+ recipient_name = _clean_text(delivery.recipient_name)
+ if not recipient_email:
+ recipient = _resolve_route_recipient(
+ db,
+ route=route,
+ payload=payload,
+ provider_config=route_provider_config,
+ )
+ if recipient is None:
+ delivery.status = "skipped"
+ delivery.last_error = "Recipient email unavailable for delivery."
+ db.commit()
+ db.refresh(delivery)
+ return _serialize_delivery(delivery)
+ recipient_email = recipient.get("email")
+ recipient_name = recipient.get("name")
+ delivery.recipient_email = recipient_email
+ delivery.recipient_name = recipient_name
+
try:
provider = _get_provider(route.provider)
result = await provider.send_email(
- to_email=route.recipient_email,
- to_name=route.recipient_name,
+ to_email=recipient_email,
+ to_name=recipient_name,
subject=subject,
body=body,
tags=[delivery.event_type],
+ provider_config=route_provider_config,
)
except IntegrationProviderError as exc:
delivery.status = "failed"
@@ -286,23 +544,38 @@ async def process_pending_deliveries(
*,
limit: int = 20,
delivery_ids: list[int] | None = None,
+ statuses: str | list[str] | tuple[str, ...] | None = None,
+ event_type: str | None = None,
+ provider: str | None = None,
+ route_id: int | None = None,
) -> list[dict[str, Any]]:
+ normalized_statuses = _normalize_delivery_statuses(statuses)
+ normalized_event_type = _validate_event_type(event_type) if _clean_text(event_type) else None
+ normalized_provider = _clean_text(provider)
+ normalized_route_id = int(route_id) if route_id is not None else None
+ normalized_delivery_ids = sorted({int(delivery_id) for delivery_id in delivery_ids or []})
+
+ if not normalized_delivery_ids and normalized_statuses is None:
+ normalized_statuses = ["pending", "failed"]
+
db = SessionMockLocal()
try:
query = db.query(IntegrationDelivery)
- if delivery_ids:
- query = query.filter(IntegrationDelivery.id.in_(delivery_ids))
+ if normalized_delivery_ids:
+ query = query.filter(IntegrationDelivery.id.in_(normalized_delivery_ids))
else:
- query = query.filter(
- or_(
- IntegrationDelivery.status == "pending",
- IntegrationDelivery.status == "failed",
- )
- )
+ if normalized_statuses:
+ query = query.filter(IntegrationDelivery.status.in_(normalized_statuses))
+ if normalized_event_type:
+ query = query.filter(IntegrationDelivery.event_type == normalized_event_type)
+ if normalized_provider:
+ query = query.filter(IntegrationDelivery.provider == normalized_provider)
+ if normalized_route_id is not None:
+ query = query.filter(IntegrationDelivery.route_id == normalized_route_id)
query = query.order_by(IntegrationDelivery.id.asc())
if limit and int(limit) > 0:
query = query.limit(max(1, int(limit)))
- if delivery_ids:
+ if normalized_delivery_ids:
query = query.order_by(IntegrationDelivery.id.asc())
selected_ids = [row.id for row in query.all()]
finally:
diff --git a/app/services/orchestration/conversation_state_store.py b/app/services/orchestration/conversation_state_store.py
index 883df31..11abfc2 100644
--- a/app/services/orchestration/conversation_state_store.py
+++ b/app/services/orchestration/conversation_state_store.py
@@ -21,6 +21,7 @@ class ConversationStateStore(ConversationStateRepository):
self.pending_stock_selections: dict[int, dict] = {}
self.pending_rental_drafts: dict[int, dict] = {}
self.pending_rental_selections: dict[int, dict] = {}
+ self.pending_email_capture_requests: dict[int, dict] = {}
self.telegram_processed_messages: dict[int, dict] = {}
self.telegram_runtime_state: dict[int, dict] = {}
diff --git a/app/services/orchestration/orquestrador_service.py b/app/services/orchestration/orquestrador_service.py
index b983f32..9eeb61f 100644
--- a/app/services/orchestration/orquestrador_service.py
+++ b/app/services/orchestration/orquestrador_service.py
@@ -9,6 +9,17 @@ from uuid import uuid4
from fastapi import HTTPException
from sqlalchemy.orm import Session
+from app.db.mock_database import SessionMockLocal
+from app.repositories.user_repository import UserRepository
+from app.services.integrations.events import (
+ ORDER_CANCELLED_EVENT,
+ ORDER_CREATED_EVENT,
+ RENTAL_OPENED_EVENT,
+ RENTAL_PAYMENT_REGISTERED_EVENT,
+ RENTAL_RETURN_REGISTERED_EVENT,
+ REVIEW_SCHEDULED_EVENT,
+)
+from app.services.integrations.service import emit_business_event, sync_user_email_integration_routes
from app.services.orchestration.orchestrator_config import (
LOW_VALUE_RESPONSES,
ORCHESTRATION_CONTROL_TOOLS,
@@ -39,6 +50,18 @@ from app.services.orchestration.response_formatter import format_currency_br, fo
logger = logging.getLogger(__name__)
+EMAIL_CAPTURE_BUCKET = "pending_email_capture_requests"
+EMAIL_CAPTURE_TTL_MINUTES = 30
+EMAIL_CAPTURE_PATTERN = re.compile(r"^[A-Z0-9._%+-]+@[A-Z0-9.-]+\.[A-Z]{2,}$", re.IGNORECASE)
+EMAIL_CAPTURE_EVENT_BY_TOOL = {
+ "realizar_pedido": ORDER_CREATED_EVENT,
+ "cancelar_pedido": ORDER_CANCELLED_EVENT,
+ "agendar_revisao": REVIEW_SCHEDULED_EVENT,
+ "abrir_locacao_aluguel": RENTAL_OPENED_EVENT,
+ "registrar_pagamento_aluguel": RENTAL_PAYMENT_REGISTERED_EVENT,
+ "registrar_devolucao_aluguel": RENTAL_RETURN_REGISTERED_EVENT,
+}
+
# Coordenador principal do turno conversacional:
# atualiza estado, pede decisoes ao modelo, continua fluxos e executa tools.
@@ -57,6 +80,7 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin, RentalFlowMixin):
self.tool_executor = ToolExecutor(registry=self.registry)
self.policy = ConversationPolicy(service=self)
self.history_service = ConversationHistoryService()
+ self._user_profile_routes_ready = False
@property
def _context_manager(self) -> OrchestratorContextManager:
@@ -112,6 +136,10 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin, RentalFlowMixin):
base_response=composed,
user_id=user_id,
)
+ final_response = self._append_email_capture_prompt_if_needed(
+ response=final_response,
+ user_id=user_id,
+ )
turn_trace["elapsed_ms"] = round((perf_counter() - turn_started_perf) * 1000, 2)
self._log_turn_event("turn_completed", response=final_response)
if not turn_history_persisted:
@@ -125,6 +153,13 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin, RentalFlowMixin):
try:
self._upsert_user_context(user_id=user_id)
+ self._ensure_user_email_routes(user_id=user_id)
+ pending_email_capture_response = await self._try_handle_pending_email_capture_message(
+ message=message,
+ user_id=user_id,
+ )
+ if pending_email_capture_response:
+ return await finish(pending_email_capture_response)
if hasattr(self, "policy") and self._is_order_selection_reset_message(message):
reset_override = await self._try_handle_immediate_context_reset(
message=message,
@@ -1659,6 +1694,189 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin, RentalFlowMixin):
def _save_user_context(self, user_id: int | None, context: dict | None) -> None:
self._context_manager.save_user_context(user_id=user_id, context=context)
+ def _get_user_record(self, user_id: int | None):
+ if user_id is None:
+ return None
+ db = SessionMockLocal()
+ try:
+ return UserRepository(db).get_by_id(user_id=user_id)
+ except Exception:
+ logger.debug(
+ "Falha ao carregar cadastro do usuario para email.",
+ extra={"user_id": user_id},
+ )
+ return None
+ finally:
+ db.close()
+
+ def _get_saved_user_email(self, user_id: int | None) -> str | None:
+ user = self._get_user_record(user_id=user_id)
+ return str(getattr(user, "email", "") or "").strip().lower() or None
+
+ def _save_user_email(self, user_id: int | None, email: str | None):
+ if user_id is None:
+ return None
+ db = SessionMockLocal()
+ try:
+ repo = UserRepository(db)
+ user = repo.get_by_id(user_id=user_id)
+ if not user:
+ return None
+ return repo.update_email(user=user, email=email)
+ except Exception:
+ logger.debug(
+ "Falha ao salvar email do usuario.",
+ extra={"user_id": user_id},
+ )
+ return None
+ finally:
+ db.close()
+
+ def _ensure_user_email_routes(self, user_id: int | None) -> None:
+ if getattr(self, "_user_profile_routes_ready", False):
+ return
+ user = self._get_user_record(user_id=user_id)
+ if not user or not getattr(user, "email", None):
+ return
+ try:
+ sync_user_email_integration_routes(
+ user_id=user.id,
+ recipient_email=user.email,
+ recipient_name=user.name,
+ )
+ self._user_profile_routes_ready = True
+ except Exception:
+ logger.exception(
+ "Falha ao sincronizar rotas de email do usuario.",
+ extra={"user_id": user_id},
+ )
+
+ def _normalize_email_address(self, value: str | None) -> str | None:
+ normalized = str(value or "").strip().lower()
+ if not normalized:
+ return None
+ if not EMAIL_CAPTURE_PATTERN.fullmatch(normalized):
+ return None
+ return normalized
+
+ def _is_email_capture_decline_message(self, text: str) -> bool:
+ normalized = self._normalize_text(text).strip().rstrip(".!?,;:")
+ return normalized in {
+ "nao",
+ "nao quero",
+ "nao quero informar",
+ "prefiro nao informar",
+ "agora nao",
+ "sem email",
+ }
+
+ def _get_pending_email_capture_request(self, user_id: int | None) -> dict | None:
+ state = getattr(self, "state", None)
+ if state is None or not hasattr(state, "get_entry"):
+ return None
+ return state.get_entry(EMAIL_CAPTURE_BUCKET, user_id, expire=True)
+
+ def _clear_pending_email_capture_request(self, user_id: int | None) -> None:
+ state = getattr(self, "state", None)
+ if state is None or not hasattr(state, "pop_entry"):
+ return
+ state.pop_entry(EMAIL_CAPTURE_BUCKET, user_id)
+
+ def _stage_email_capture_request(
+ self,
+ tool_name: str,
+ tool_result,
+ user_id: int | None,
+ ) -> None:
+ state = getattr(self, "state", None)
+ if (
+ user_id is None
+ or tool_name not in EMAIL_CAPTURE_EVENT_BY_TOOL
+ or not isinstance(tool_result, dict)
+ or state is None
+ or not hasattr(state, "set_entry")
+ ):
+ return
+ if self._get_saved_user_email(user_id=user_id):
+ return
+
+ payload = dict(tool_result)
+ payload.setdefault("user_id", user_id)
+ state.set_entry(
+ EMAIL_CAPTURE_BUCKET,
+ user_id,
+ {
+ "request_id": str((getattr(self, "_turn_trace", {}) or {}).get("request_id") or ""),
+ "event_type": EMAIL_CAPTURE_EVENT_BY_TOOL[tool_name],
+ "payload": payload,
+ "expires_at": utc_now() + timedelta(minutes=EMAIL_CAPTURE_TTL_MINUTES),
+ },
+ )
+
+ def _append_email_capture_prompt_if_needed(self, response: str, user_id: int | None) -> str:
+ if user_id is None or self._get_saved_user_email(user_id=user_id):
+ return response
+ pending = self._get_pending_email_capture_request(user_id=user_id)
+ current_request_id = str((getattr(self, "_turn_trace", {}) or {}).get("request_id") or "")
+ if not pending or pending.get("request_id") != current_request_id:
+ return response
+ prompt = (
+ "Se quiser, posso te enviar esse resumo por e-mail. "
+ "Responda com um e-mail valido ou diga 'prefiro nao informar'."
+ )
+ base = str(response or "").rstrip()
+ return f"{base}\n\n{prompt}" if base else prompt
+
+ async def _try_handle_pending_email_capture_message(
+ self,
+ message: str,
+ user_id: int | None,
+ ) -> str | None:
+ if user_id is None:
+ return None
+ pending = self._get_pending_email_capture_request(user_id=user_id)
+ if not pending:
+ return None
+
+ if self._is_email_capture_decline_message(message):
+ self._clear_pending_email_capture_request(user_id=user_id)
+ return "Tudo bem. Nao vou enviar este resumo por e-mail."
+
+ normalized_email = self._normalize_email_address(message)
+ if not normalized_email:
+ return None
+
+ user = self._save_user_email(user_id=user_id, email=normalized_email)
+ if not user:
+ self._clear_pending_email_capture_request(user_id=user_id)
+ return "Nao consegui localizar seu cadastro para salvar o e-mail."
+
+ self._ensure_user_email_routes(user_id=user_id)
+ event_type = str(pending.get("event_type") or "").strip()
+ payload = dict(pending.get("payload") or {})
+ payload.setdefault("user_id", user_id)
+
+ deliveries = []
+ if event_type and payload:
+ try:
+ deliveries = await emit_business_event(event_type=event_type, payload=payload)
+ except Exception:
+ logger.exception(
+ "Falha ao reenviar evento apos captura de email do usuario.",
+ extra={"user_id": user_id, "event_type": event_type},
+ )
+
+ self._clear_pending_email_capture_request(user_id=user_id)
+ delivered = any(
+ isinstance(item, dict)
+ and item.get("status") == "sent"
+ and str(item.get("provider_message_id") or "").strip()
+ for item in deliveries
+ )
+ if delivered:
+ return f"Perfeito. Salvei seu e-mail {normalized_email} e enviei este resumo por la."
+ return f"Perfeito. Salvei seu e-mail {normalized_email}. Vou usar esse endereco nos proximos envios."
+
def _extract_generic_memory_fields(self, llm_generic_fields: dict | None = None) -> dict:
return self._context_manager.extract_generic_memory_fields(
llm_generic_fields=llm_generic_fields,
@@ -1699,6 +1917,11 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin, RentalFlowMixin):
tool_result=tool_result,
user_id=user_id,
)
+ self._stage_email_capture_request(
+ tool_name=tool_name,
+ tool_result=tool_result,
+ user_id=user_id,
+ )
async def _maybe_build_stock_suggestion_response(
self,
@@ -2061,25 +2284,7 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin, RentalFlowMixin):
if decision_intent != "review_schedule":
return False
- entities = extracted_entities if isinstance(extracted_entities, dict) else {}
- review_fields = entities.get("review_fields")
- generic_memory = entities.get("generic_memory")
- if not isinstance(review_fields, dict):
- review_fields = {}
- if not isinstance(generic_memory, dict):
- generic_memory = {}
-
- return any(
- (
- review_fields.get("placa"),
- review_fields.get("data_hora"),
- review_fields.get("modelo"),
- review_fields.get("ano"),
- review_fields.get("km"),
- review_fields.get("revisao_previa_concessionaria"),
- generic_memory.get("placa"),
- )
- )
+ return True
def _has_trade_in_evaluation_request(self, message: str, turn_decision: dict | None = None) -> bool:
normalized_message = self._normalize_text(message or "").strip()
@@ -2748,5 +2953,3 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin, RentalFlowMixin):
tool_name=tool_name,
tool_result=tool_result,
)
-
-
diff --git a/scripts/list_integration_deliveries.py b/scripts/list_integration_deliveries.py
new file mode 100644
index 0000000..56dcdd5
--- /dev/null
+++ b/scripts/list_integration_deliveries.py
@@ -0,0 +1,34 @@
+import argparse
+import json
+import sys
+from pathlib import Path
+
+PROJECT_ROOT = Path(__file__).resolve().parents[1]
+if str(PROJECT_ROOT) not in sys.path:
+ sys.path.insert(0, str(PROJECT_ROOT))
+
+from app.services.integrations.events import SUPPORTED_EVENT_TYPES
+from app.services.integrations.service import SUPPORTED_DELIVERY_STATUSES, list_integration_deliveries
+
+
+def main() -> None:
+ parser = argparse.ArgumentParser(description="Lista entregas do outbox de integracoes com filtros operacionais.")
+ parser.add_argument("--status", action="append", choices=SUPPORTED_DELIVERY_STATUSES)
+ parser.add_argument("--event", choices=SUPPORTED_EVENT_TYPES)
+ parser.add_argument("--provider")
+ parser.add_argument("--route-id", type=int)
+ parser.add_argument("--limit", type=int, default=50)
+ args = parser.parse_args()
+
+ deliveries = list_integration_deliveries(
+ statuses=args.status,
+ event_type=args.event,
+ provider=args.provider,
+ route_id=args.route_id,
+ limit=args.limit,
+ )
+ print(json.dumps(deliveries, ensure_ascii=True, indent=2, sort_keys=True))
+
+
+if __name__ == "__main__":
+ main()
\ No newline at end of file
diff --git a/scripts/list_integration_routes.py b/scripts/list_integration_routes.py
new file mode 100644
index 0000000..153219e
--- /dev/null
+++ b/scripts/list_integration_routes.py
@@ -0,0 +1,38 @@
+import argparse
+import json
+import sys
+from pathlib import Path
+
+PROJECT_ROOT = Path(__file__).resolve().parents[1]
+if str(PROJECT_ROOT) not in sys.path:
+ sys.path.insert(0, str(PROJECT_ROOT))
+
+from app.services.integrations.events import SUPPORTED_EVENT_TYPES
+from app.services.integrations.service import list_integration_routes
+
+
+def main() -> None:
+ parser = argparse.ArgumentParser(description="Lista rotas de integracao configuradas.")
+ parser.add_argument("--event", choices=SUPPORTED_EVENT_TYPES)
+ parser.add_argument("--provider")
+ enabled_group = parser.add_mutually_exclusive_group()
+ enabled_group.add_argument("--enabled", action="store_true")
+ enabled_group.add_argument("--disabled", action="store_true")
+ args = parser.parse_args()
+
+ enabled = None
+ if args.enabled:
+ enabled = True
+ if args.disabled:
+ enabled = False
+
+ routes = list_integration_routes(
+ event_type=args.event,
+ provider=args.provider,
+ enabled=enabled,
+ )
+ print(json.dumps(routes, ensure_ascii=True, indent=2, sort_keys=True))
+
+
+if __name__ == "__main__":
+ main()
\ No newline at end of file
diff --git a/scripts/process_integration_deliveries.py b/scripts/process_integration_deliveries.py
index d349625..783d239 100644
--- a/scripts/process_integration_deliveries.py
+++ b/scripts/process_integration_deliveries.py
@@ -1,21 +1,57 @@
import argparse
import asyncio
import json
+import sys
+from pathlib import Path
-from app.services.integrations.service import process_pending_deliveries
+PROJECT_ROOT = Path(__file__).resolve().parents[1]
+if str(PROJECT_ROOT) not in sys.path:
+ sys.path.insert(0, str(PROJECT_ROOT))
+from app.services.integrations.events import SUPPORTED_EVENT_TYPES
+from app.services.integrations.service import SUPPORTED_DELIVERY_STATUSES, process_pending_deliveries
-async def _main_async(limit: int) -> None:
- deliveries = await process_pending_deliveries(limit=limit)
+
+async def _main_async(
+ *,
+ limit: int,
+ delivery_ids: list[int] | None,
+ statuses: list[str] | None,
+ event_type: str | None,
+ provider: str | None,
+ route_id: int | None,
+) -> None:
+ deliveries = await process_pending_deliveries(
+ limit=limit,
+ delivery_ids=delivery_ids,
+ statuses=statuses,
+ event_type=event_type,
+ provider=provider,
+ route_id=route_id,
+ )
print(json.dumps(deliveries, ensure_ascii=True, indent=2, sort_keys=True))
def main() -> None:
parser = argparse.ArgumentParser(description="Processa entregas pendentes do outbox de integracoes.")
parser.add_argument("--limit", type=int, default=20)
+ parser.add_argument("--delivery-id", dest="delivery_ids", type=int, action="append")
+ parser.add_argument("--status", action="append", choices=SUPPORTED_DELIVERY_STATUSES)
+ parser.add_argument("--event", choices=SUPPORTED_EVENT_TYPES)
+ parser.add_argument("--provider")
+ parser.add_argument("--route-id", type=int)
args = parser.parse_args()
- asyncio.run(_main_async(limit=args.limit))
+ asyncio.run(
+ _main_async(
+ limit=args.limit,
+ delivery_ids=args.delivery_ids,
+ statuses=args.status,
+ event_type=args.event,
+ provider=args.provider,
+ route_id=args.route_id,
+ )
+ )
if __name__ == "__main__":
- main()
+ main()
\ No newline at end of file
diff --git a/scripts/upsert_integration_route.py b/scripts/upsert_integration_route.py
index d0b5626..f4f4ffd 100644
--- a/scripts/upsert_integration_route.py
+++ b/scripts/upsert_integration_route.py
@@ -1,10 +1,93 @@
import argparse
import json
+import sys
+from pathlib import Path
+from typing import Any
+
+PROJECT_ROOT = Path(__file__).resolve().parents[1]
+if str(PROJECT_ROOT) not in sys.path:
+ sys.path.insert(0, str(PROJECT_ROOT))
from app.services.integrations.events import SUPPORTED_EVENT_TYPES
from app.services.integrations.service import upsert_email_integration_route
+def _deep_merge_dicts(base: dict[str, Any], extra: dict[str, Any]) -> dict[str, Any]:
+ merged = dict(base)
+ for key, value in extra.items():
+ current = merged.get(key)
+ if isinstance(current, dict) and isinstance(value, dict):
+ merged[key] = _deep_merge_dicts(current, value)
+ else:
+ merged[key] = value
+ return merged
+
+
+def _parse_provider_config_json(raw_value: str | None) -> dict[str, Any]:
+ if not raw_value:
+ return {}
+ try:
+ parsed = json.loads(raw_value)
+ except ValueError as exc:
+ raise SystemExit(f"provider_config_json invalido: {exc}") from exc
+ if not isinstance(parsed, dict):
+ raise SystemExit("provider_config_json deve ser um objeto JSON.")
+ return parsed
+
+
+def _parse_headers(values: list[str] | None) -> dict[str, str]:
+ headers: dict[str, str] = {}
+ for value in values or []:
+ key, separator, header_value = str(value or "").partition("=")
+ key = key.strip()
+ header_value = header_value.strip()
+ if not separator or not key or not header_value:
+ raise SystemExit(f"header invalido: {value}. Use o formato Chave=Valor.")
+ headers[key] = header_value
+ return headers
+
+
+def _build_provider_config(args) -> dict[str, Any] | None:
+ provider_config = _parse_provider_config_json(args.provider_config_json)
+ convenience_config: dict[str, Any] = {}
+
+ if args.sender_email or args.sender_name:
+ convenience_config["sender"] = {
+ key: value
+ for key, value in {
+ "email": args.sender_email,
+ "name": args.sender_name,
+ }.items()
+ if value
+ }
+
+ if args.reply_to_email or args.reply_to_name:
+ convenience_config["reply_to"] = {
+ key: value
+ for key, value in {
+ "email": args.reply_to_email,
+ "name": args.reply_to_name,
+ }.items()
+ if value
+ }
+
+ if args.cc:
+ convenience_config["cc"] = args.cc
+ if args.bcc:
+ convenience_config["bcc"] = args.bcc
+ if args.tag:
+ convenience_config["tags"] = args.tag
+ if args.html_content:
+ convenience_config["html_content"] = args.html_content
+
+ headers = _parse_headers(args.header)
+ if headers:
+ convenience_config["headers"] = headers
+
+ merged = _deep_merge_dicts(provider_config, convenience_config)
+ return merged or None
+
+
def main() -> None:
parser = argparse.ArgumentParser(description="Cria ou atualiza uma rota de integracao por email.")
parser.add_argument("--event", required=True, choices=SUPPORTED_EVENT_TYPES)
@@ -12,6 +95,16 @@ def main() -> None:
parser.add_argument("--name")
parser.add_argument("--subject-template")
parser.add_argument("--body-template")
+ parser.add_argument("--provider-config-json")
+ parser.add_argument("--sender-email")
+ parser.add_argument("--sender-name")
+ parser.add_argument("--reply-to-email")
+ parser.add_argument("--reply-to-name")
+ parser.add_argument("--cc", action="append")
+ parser.add_argument("--bcc", action="append")
+ parser.add_argument("--tag", action="append")
+ parser.add_argument("--header", action="append")
+ parser.add_argument("--html-content")
parser.add_argument("--disabled", action="store_true")
args = parser.parse_args()
@@ -22,6 +115,7 @@ def main() -> None:
subject_template=args.subject_template,
body_template=args.body_template,
enabled=not args.disabled,
+ provider_config=_build_provider_config(args),
)
print(json.dumps(route, ensure_ascii=True, indent=2, sort_keys=True))
diff --git a/tests/test_brevo_provider.py b/tests/test_brevo_provider.py
new file mode 100644
index 0000000..9f6cefb
--- /dev/null
+++ b/tests/test_brevo_provider.py
@@ -0,0 +1,100 @@
+import os
+import unittest
+from unittest.mock import AsyncMock, patch
+
+os.environ.setdefault("DEBUG", "false")
+
+from app.core.settings import settings
+from app.services.integrations.providers import BrevoEmailProvider
+
+
+class _FakeResponse:
+ def __init__(self, *, status_code: int = 201, payload: dict | None = None, text: str = "") -> None:
+ self.status_code = status_code
+ self._payload = payload or {}
+ self.text = text
+
+ def json(self) -> dict:
+ return self._payload
+
+
+class BrevoEmailProviderTests(unittest.IsolatedAsyncioTestCase):
+ async def test_send_email_applies_provider_config_to_payload(self):
+ response = _FakeResponse(payload={"messageId": "brevo-123"}, text='{"messageId":"brevo-123"}')
+ post_mock = AsyncMock(return_value=response)
+ fake_client = type("FakeClient", (), {"post": post_mock})()
+
+ with patch.object(settings, "brevo_api_key", "brevo-key"), patch.object(
+ settings,
+ "brevo_sender_email",
+ "sender@empresa.com",
+ ), patch.object(settings, "brevo_sender_name", "Orquestrador"), patch(
+ "app.services.integrations.providers.httpx.AsyncClient"
+ ) as client_cls:
+ client_cls.return_value.__aenter__ = AsyncMock(return_value=fake_client)
+ client_cls.return_value.__aexit__ = AsyncMock(return_value=None)
+ provider = BrevoEmailProvider()
+ result = await provider.send_email(
+ to_email="destinatario@empresa.com",
+ to_name="Operacoes",
+ subject="Pedido criado",
+ body="Pedido PED-1 criado com sucesso.",
+ tags=["order.created"],
+ provider_config={
+ "sender": {"email": "noreply@empresa.com", "name": "Operacoes"},
+ "reply_to": {"email": "atendimento@empresa.com", "name": "Atendimento"},
+ "cc": ["financeiro@empresa.com", {"email": "gestor@empresa.com", "name": "Gestor"}],
+ "bcc": ["auditoria@empresa.com"],
+ "tags": ["ops", "order.created"],
+ "headers": {"X-Canal": "orquestrador"},
+ "html_content": "Pedido PED-1 criado com sucesso.",
+ },
+ )
+
+ payload = post_mock.await_args.kwargs["json"]
+ self.assertEqual(payload["sender"], {"email": "noreply@empresa.com", "name": "Operacoes"})
+ self.assertEqual(payload["replyTo"], {"email": "atendimento@empresa.com", "name": "Atendimento"})
+ self.assertEqual(
+ payload["cc"],
+ [
+ {"email": "financeiro@empresa.com"},
+ {"email": "gestor@empresa.com", "name": "Gestor"},
+ ],
+ )
+ self.assertEqual(payload["bcc"], [{"email": "auditoria@empresa.com"}])
+ self.assertEqual(payload["tags"], ["order.created", "ops"])
+ self.assertEqual(payload["headers"], {"X-Canal": "orquestrador"})
+ self.assertEqual(payload["htmlContent"], "Pedido PED-1 criado com sucesso.")
+ self.assertEqual(result["message_id"], "brevo-123")
+
+ async def test_send_email_accepts_route_sender_without_global_sender_email(self):
+ response = _FakeResponse(payload={"messageId": "brevo-456"}, text='{"messageId":"brevo-456"}')
+ post_mock = AsyncMock(return_value=response)
+ fake_client = type("FakeClient", (), {"post": post_mock})()
+
+ with patch.object(settings, "brevo_api_key", "brevo-key"), patch.object(
+ settings,
+ "brevo_sender_email",
+ "",
+ ), patch.object(settings, "brevo_sender_name", "Orquestrador"), patch(
+ "app.services.integrations.providers.httpx.AsyncClient"
+ ) as client_cls:
+ client_cls.return_value.__aenter__ = AsyncMock(return_value=fake_client)
+ client_cls.return_value.__aexit__ = AsyncMock(return_value=None)
+ provider = BrevoEmailProvider()
+ await provider.send_email(
+ to_email="destinatario@empresa.com",
+ to_name=None,
+ subject="Pedido criado",
+ body="Pedido PED-1 criado com sucesso.",
+ provider_config={
+ "sender": {"email": "noreply@empresa.com", "name": "Operacoes"},
+ },
+ )
+
+ payload = post_mock.await_args.kwargs["json"]
+ self.assertEqual(payload["sender"], {"email": "noreply@empresa.com", "name": "Operacoes"})
+
+
+if __name__ == "__main__":
+ unittest.main()
\ No newline at end of file
diff --git a/tests/test_conversation_state_store.py b/tests/test_conversation_state_store.py
index 39cf44d..8c5002b 100644
--- a/tests/test_conversation_state_store.py
+++ b/tests/test_conversation_state_store.py
@@ -1,5 +1,6 @@
-import unittest
+import unittest
+from app.core.time_utils import utc_now
from app.services.orchestration.conversation_state_store import ConversationStateStore
@@ -34,6 +35,21 @@ class ConversationStateStoreTests(unittest.TestCase):
self.assertEqual(stored_context["active_task"], "order_create")
self.assertEqual(stored_context["expires_at"], original_expires_at)
+ def test_pending_email_capture_bucket_supports_set_get_and_pop(self):
+ store = ConversationStateStore()
+ payload = {
+ "request_id": "req-1",
+ "event_type": "order.created",
+ "payload": {"numero_pedido": "PED-1", "user_id": 7},
+ "expires_at": utc_now(),
+ }
+
+ store.set_entry("pending_email_capture_requests", 7, payload)
+
+ self.assertEqual(store.get_entry("pending_email_capture_requests", 7), payload)
+ self.assertEqual(store.pop_entry("pending_email_capture_requests", 7), payload)
+ self.assertIsNone(store.get_entry("pending_email_capture_requests", 7))
+
if __name__ == "__main__":
unittest.main()
diff --git a/tests/test_integration_service.py b/tests/test_integration_service.py
index fe4706a..31f7d34 100644
--- a/tests/test_integration_service.py
+++ b/tests/test_integration_service.py
@@ -1,4 +1,4 @@
-import os
+import os
import unittest
from unittest.mock import AsyncMock, patch
@@ -9,8 +9,8 @@ from sqlalchemy.pool import StaticPool
os.environ.setdefault("DEBUG", "false")
from app.db.mock_database import MockBase
-from app.db.mock_models import IntegrationDelivery
-from app.services.integrations.events import ORDER_CREATED_EVENT
+from app.db.mock_models import IntegrationDelivery, IntegrationRoute, User
+from app.services.integrations.events import ORDER_CREATED_EVENT, REVIEW_SCHEDULED_EVENT
from app.services.integrations import service as integration_service
@@ -26,6 +26,29 @@ class IntegrationServiceTests(unittest.IsolatedAsyncioTestCase):
self.addCleanup(engine.dispose)
return SessionLocal
+ def _create_user(
+ self,
+ SessionLocal,
+ *,
+ user_id: int,
+ email: str | None,
+ name: str = "Cliente Teste",
+ ) -> None:
+ db = SessionLocal()
+ try:
+ db.add(
+ User(
+ id=user_id,
+ channel="telegram",
+ external_id=f"tg-{user_id}",
+ name=name,
+ email=email,
+ )
+ )
+ db.commit()
+ finally:
+ db.close()
+
async def test_emit_business_event_creates_and_dispatches_delivery(self):
SessionLocal = self._build_session_local()
@@ -66,15 +89,62 @@ class IntegrationServiceTests(unittest.IsolatedAsyncioTestCase):
self.assertEqual(len(deliveries), 1)
self.assertEqual(deliveries[0]["status"], "sent")
self.assertEqual(deliveries[0]["provider_message_id"], "brevo-123")
+ self.assertEqual(deliveries[0]["recipient_email"], "ops@example.com")
db = SessionLocal()
try:
stored = db.query(IntegrationDelivery).one()
self.assertEqual(stored.status, "sent")
self.assertEqual(stored.provider_message_id, "brevo-123")
+ self.assertEqual(stored.recipient_email, "ops@example.com")
finally:
db.close()
+ async def test_emit_business_event_passes_route_provider_config_to_provider(self):
+ SessionLocal = self._build_session_local()
+ fake_provider = type(
+ "FakeProvider",
+ (),
+ {
+ "send_email": AsyncMock(return_value={"message_id": "brevo-config"}),
+ },
+ )()
+ provider_config = {
+ "sender": {"email": "noreply@example.com", "name": "Operacoes"},
+ "reply_to": {"email": "atendimento@example.com", "name": "Atendimento"},
+ "tags": ["ops"],
+ }
+
+ with patch.object(integration_service, "SessionMockLocal", SessionLocal), patch.object(
+ integration_service.settings,
+ "integrations_enabled",
+ True,
+ ), patch.object(
+ integration_service.settings,
+ "integration_sync_delivery_enabled",
+ True,
+ ), patch.object(
+ integration_service,
+ "_get_provider",
+ return_value=fake_provider,
+ ):
+ integration_service.upsert_email_integration_route(
+ event_type=ORDER_CREATED_EVENT,
+ recipient_email="ops@example.com",
+ provider_config=provider_config,
+ )
+ await integration_service.emit_business_event(
+ ORDER_CREATED_EVENT,
+ {
+ "numero_pedido": "PED-1",
+ },
+ )
+
+ fake_provider.send_email.assert_awaited_once()
+ kwargs = fake_provider.send_email.await_args.kwargs
+ self.assertEqual(kwargs["provider_config"], provider_config)
+ self.assertEqual(kwargs["tags"], [ORDER_CREATED_EVENT])
+
async def test_emit_business_event_deduplicates_by_route_and_payload(self):
SessionLocal = self._build_session_local()
@@ -102,6 +172,155 @@ class IntegrationServiceTests(unittest.IsolatedAsyncioTestCase):
finally:
db.close()
+ async def test_emit_business_event_skips_dynamic_user_route_without_saved_email(self):
+ SessionLocal = self._build_session_local()
+ self._create_user(SessionLocal, user_id=7, email=None)
+
+ with patch.object(integration_service, "SessionMockLocal", SessionLocal), patch.object(
+ integration_service.settings,
+ "integrations_enabled",
+ True,
+ ), patch.object(
+ integration_service.settings,
+ "integration_sync_delivery_enabled",
+ False,
+ ):
+ integration_service.sync_user_email_integration_routes(
+ user_id=7,
+ recipient_email="cliente@example.com",
+ recipient_name="Cliente Teste",
+ )
+ deliveries = await integration_service.emit_business_event(
+ ORDER_CREATED_EVENT,
+ {"numero_pedido": "PED-SEM-EMAIL", "user_id": 7},
+ )
+
+ self.assertEqual(deliveries, [])
+
+ db = SessionLocal()
+ try:
+ self.assertEqual(db.query(IntegrationDelivery).count(), 0)
+ finally:
+ db.close()
+
+ async def test_emit_business_event_resolves_dynamic_user_recipient_from_user_profile(self):
+ SessionLocal = self._build_session_local()
+ self._create_user(SessionLocal, user_id=7, email="cliente@example.com", name="Cliente Teste")
+ fake_provider = type(
+ "FakeProvider",
+ (),
+ {
+ "send_email": AsyncMock(return_value={"message_id": "brevo-dynamic"}),
+ },
+ )()
+
+ with patch.object(integration_service, "SessionMockLocal", SessionLocal), patch.object(
+ integration_service.settings,
+ "integrations_enabled",
+ True,
+ ), patch.object(
+ integration_service.settings,
+ "integration_sync_delivery_enabled",
+ True,
+ ), patch.object(
+ integration_service,
+ "_get_provider",
+ return_value=fake_provider,
+ ):
+ integration_service.sync_user_email_integration_routes(
+ user_id=7,
+ recipient_email="cliente@example.com",
+ recipient_name="Cliente Teste",
+ )
+ deliveries = await integration_service.emit_business_event(
+ ORDER_CREATED_EVENT,
+ {"numero_pedido": "PED-DINAMICO", "user_id": 7},
+ )
+
+ self.assertEqual(len(deliveries), 1)
+ self.assertEqual(deliveries[0]["status"], "sent")
+ self.assertEqual(deliveries[0]["recipient_email"], "cliente@example.com")
+
+ fake_provider.send_email.assert_awaited_once()
+ kwargs = fake_provider.send_email.await_args.kwargs
+ self.assertEqual(kwargs["to_email"], "cliente@example.com")
+ self.assertEqual(kwargs["to_name"], "Cliente Teste")
+
+ db = SessionLocal()
+ try:
+ stored = db.query(IntegrationDelivery).one()
+ self.assertEqual(stored.recipient_email, "cliente@example.com")
+ self.assertEqual(stored.recipient_name, "Cliente Teste")
+ finally:
+ db.close()
+
+ async def test_sync_user_email_integration_routes_creates_one_global_dynamic_route_per_event(self):
+ SessionLocal = self._build_session_local()
+
+ with patch.object(integration_service, "SessionMockLocal", SessionLocal):
+ integration_service.sync_user_email_integration_routes(
+ user_id=7,
+ recipient_email="primeiro@example.com",
+ recipient_name="Cliente Teste",
+ )
+ integration_service.sync_user_email_integration_routes(
+ user_id=9,
+ recipient_email="segundo@example.com",
+ recipient_name="Outro Cliente",
+ )
+
+ db = SessionLocal()
+ try:
+ routes = (
+ db.query(IntegrationRoute)
+ .filter(IntegrationRoute.event_type == ORDER_CREATED_EVENT)
+ .order_by(IntegrationRoute.id.asc())
+ .all()
+ )
+ self.assertEqual(len(routes), 1)
+ self.assertEqual(routes[0].recipient_email, integration_service.DYNAMIC_USER_ROUTE_EMAIL)
+ self.assertTrue(routes[0].enabled)
+ self.assertEqual(
+ routes[0].provider_config_json,
+ integration_service._serialize_json({"recipient_scope": integration_service.USER_PROFILE_ROUTE_SCOPE}),
+ )
+ finally:
+ db.close()
+
+ async def test_sync_user_email_integration_routes_disables_legacy_user_specific_routes(self):
+ SessionLocal = self._build_session_local()
+
+ with patch.object(integration_service, "SessionMockLocal", SessionLocal):
+ integration_service.upsert_email_integration_route(
+ event_type=ORDER_CREATED_EVENT,
+ recipient_email="legacy@example.com",
+ enabled=True,
+ provider_config={
+ "recipient_scope": integration_service.USER_PROFILE_ROUTE_SCOPE,
+ "user_id": 7,
+ },
+ )
+ integration_service.sync_user_email_integration_routes(
+ user_id=7,
+ recipient_email="cliente@example.com",
+ recipient_name="Cliente Teste",
+ )
+
+ db = SessionLocal()
+ try:
+ routes = (
+ db.query(IntegrationRoute)
+ .filter(IntegrationRoute.event_type == ORDER_CREATED_EVENT)
+ .order_by(IntegrationRoute.id.asc())
+ .all()
+ )
+ self.assertEqual(len(routes), 2)
+ self.assertFalse(routes[0].enabled)
+ self.assertTrue(routes[1].enabled)
+ self.assertEqual(routes[1].recipient_email, integration_service.DYNAMIC_USER_ROUTE_EMAIL)
+ finally:
+ db.close()
+
async def test_process_pending_deliveries_marks_failure_when_provider_fails(self):
SessionLocal = self._build_session_local()
@@ -131,6 +350,131 @@ class IntegrationServiceTests(unittest.IsolatedAsyncioTestCase):
self.assertEqual(deliveries[0]["status"], "failed")
self.assertIn("brevo offline", deliveries[0]["last_error"])
+ async def test_list_integration_deliveries_filters_by_status_and_event(self):
+ SessionLocal = self._build_session_local()
+
+ with patch.object(integration_service, "SessionMockLocal", SessionLocal), patch.object(
+ integration_service.settings,
+ "integrations_enabled",
+ True,
+ ), patch.object(
+ integration_service.settings,
+ "integration_sync_delivery_enabled",
+ False,
+ ):
+ integration_service.upsert_email_integration_route(
+ event_type=ORDER_CREATED_EVENT,
+ recipient_email="ops@example.com",
+ )
+ integration_service.upsert_email_integration_route(
+ event_type=REVIEW_SCHEDULED_EVENT,
+ recipient_email="review@example.com",
+ )
+ await integration_service.emit_business_event(ORDER_CREATED_EVENT, {"numero_pedido": "PED-1"})
+ await integration_service.emit_business_event(REVIEW_SCHEDULED_EVENT, {"protocolo": "REV-1"})
+
+ db = SessionLocal()
+ try:
+ order_delivery = (
+ db.query(IntegrationDelivery)
+ .filter(IntegrationDelivery.event_type == ORDER_CREATED_EVENT)
+ .one()
+ )
+ order_delivery.status = "failed"
+ db.commit()
+ finally:
+ db.close()
+
+ with patch.object(integration_service, "SessionMockLocal", SessionLocal):
+ deliveries = integration_service.list_integration_deliveries(
+ statuses=["failed"],
+ event_type=ORDER_CREATED_EVENT,
+ limit=10,
+ )
+
+ self.assertEqual(len(deliveries), 1)
+ self.assertEqual(deliveries[0]["event_type"], ORDER_CREATED_EVENT)
+ self.assertEqual(deliveries[0]["status"], "failed")
+ self.assertEqual(deliveries[0]["recipient_email"], "ops@example.com")
+
+ async def test_process_pending_deliveries_respects_status_and_event_filters(self):
+ SessionLocal = self._build_session_local()
+
+ with patch.object(integration_service, "SessionMockLocal", SessionLocal), patch.object(
+ integration_service.settings,
+ "integrations_enabled",
+ True,
+ ), patch.object(
+ integration_service.settings,
+ "integration_sync_delivery_enabled",
+ False,
+ ):
+ integration_service.upsert_email_integration_route(
+ event_type=ORDER_CREATED_EVENT,
+ recipient_email="ops@example.com",
+ )
+ integration_service.upsert_email_integration_route(
+ event_type=REVIEW_SCHEDULED_EVENT,
+ recipient_email="review@example.com",
+ )
+ await integration_service.emit_business_event(ORDER_CREATED_EVENT, {"numero_pedido": "PED-1"})
+ await integration_service.emit_business_event(REVIEW_SCHEDULED_EVENT, {"protocolo": "REV-1"})
+
+ fake_provider = type(
+ "FakeProvider",
+ (),
+ {
+ "send_email": AsyncMock(return_value={"message_id": "brevo-filtered"}),
+ },
+ )()
+
+ with patch.object(integration_service, "SessionMockLocal", SessionLocal), patch.object(
+ integration_service,
+ "_get_provider",
+ return_value=fake_provider,
+ ):
+ deliveries = await integration_service.process_pending_deliveries(
+ statuses=["pending"],
+ event_type=REVIEW_SCHEDULED_EVENT,
+ limit=10,
+ )
+
+ self.assertEqual(len(deliveries), 1)
+ self.assertEqual(deliveries[0]["event_type"], REVIEW_SCHEDULED_EVENT)
+ self.assertEqual(deliveries[0]["status"], "sent")
+
+ db = SessionLocal()
+ try:
+ rows = (
+ db.query(IntegrationDelivery)
+ .order_by(IntegrationDelivery.event_type.asc(), IntegrationDelivery.id.asc())
+ .all()
+ )
+ self.assertEqual(rows[0].status, "pending")
+ self.assertEqual(rows[1].status, "sent")
+ finally:
+ db.close()
+
+ def test_list_integration_routes_filters_disabled_routes(self):
+ SessionLocal = self._build_session_local()
+
+ with patch.object(integration_service, "SessionMockLocal", SessionLocal):
+ integration_service.upsert_email_integration_route(
+ event_type=ORDER_CREATED_EVENT,
+ recipient_email="ops@example.com",
+ enabled=True,
+ )
+ integration_service.upsert_email_integration_route(
+ event_type=REVIEW_SCHEDULED_EVENT,
+ recipient_email="review@example.com",
+ enabled=False,
+ )
+ routes = integration_service.list_integration_routes(enabled=False)
+
+ self.assertEqual(len(routes), 1)
+ self.assertEqual(routes[0]["event_type"], REVIEW_SCHEDULED_EVENT)
+ self.assertFalse(routes[0]["enabled"])
+
if __name__ == "__main__":
unittest.main()
diff --git a/tests/test_turn_decision_contract.py b/tests/test_turn_decision_contract.py
index 29c2b52..6b654f2 100644
--- a/tests/test_turn_decision_contract.py
+++ b/tests/test_turn_decision_contract.py
@@ -1,7 +1,7 @@
import os
import unittest
from types import SimpleNamespace
-from unittest.mock import patch
+from unittest.mock import AsyncMock, patch
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
@@ -13,6 +13,7 @@ from datetime import datetime, timedelta
from app.core.time_utils import utc_now
from app.db.mock_database import MockBase
from app.db.mock_models import RentalContract, RentalPayment, RentalVehicle
+from app.services.integrations.events import ORDER_CREATED_EVENT
from app.services.orchestration.conversation_policy import ConversationPolicy
from app.services.orchestration.entity_normalizer import EntityNormalizer
@@ -1846,6 +1847,98 @@ class TurnDecisionContractTests(unittest.IsolatedAsyncioTestCase):
self.assertTrue(prioritized)
+ def test_should_prioritize_review_flow_for_review_schedule_intent_without_prefilled_fields(self):
+ service = OrquestradorService.__new__(OrquestradorService)
+ service.state = FakeState()
+ service.normalizer = EntityNormalizer()
+ service._get_user_context = lambda user_id: None
+
+ prioritized = service._should_prioritize_review_flow(
+ turn_decision={"intent": "review_schedule", "domain": "review", "action": "ask_missing_fields"},
+ extracted_entities={
+ "generic_memory": {},
+ "review_fields": {},
+ "review_management_fields": {},
+ "order_fields": {},
+ "cancel_order_fields": {},
+ "intents": {},
+ },
+ user_id=1,
+ )
+
+ self.assertTrue(prioritized)
+
+ async def test_review_schedule_direct_flow_captures_email_side_effects_after_success(self):
+ state = FakeState(
+ entries={
+ "pending_review_drafts": {
+ 1: {
+ "payload": {
+ "placa": "ABC1D23",
+ "data_hora": "2026-03-25T10:00:00",
+ "modelo": "Onix",
+ "ano": 2022,
+ "km": 35000,
+ "revisao_previa_concessionaria": False,
+ },
+ "expires_at": utc_now() + timedelta(minutes=15),
+ }
+ }
+ },
+ contexts={
+ 1: {
+ "active_domain": "review",
+ "active_task": "review_schedule",
+ "generic_memory": {},
+ "shared_memory": {},
+ "order_queue": [],
+ "pending_order_selection": None,
+ "pending_switch": None,
+ "last_stock_results": [],
+ "selected_vehicle": None,
+ "expires_at": utc_now() + timedelta(minutes=15),
+ }
+ },
+ )
+ service = OrquestradorService.__new__(OrquestradorService)
+ service.state = state
+ service.normalizer = EntityNormalizer()
+ service.tool_executor = FakeToolExecutor(
+ result={
+ "protocolo": "REV-20260325-084279F3",
+ "placa": "ABC1D23",
+ "data_hora": "2026-03-25T10:00:00",
+ "modelo": "Onix",
+ "ano": 2022,
+ "km": 35000,
+ "valor_revisao": 906.0,
+ "status": "agendado",
+ "user_id": 1,
+ }
+ )
+ service._get_user_context = lambda user_id: state.get_user_context(user_id)
+ service._save_user_context = lambda user_id, context: state.save_user_context(user_id, context)
+ service._try_prefill_review_fields_from_memory = lambda user_id, payload: None
+ service._store_last_review_package = lambda user_id, payload: None
+ service._log_review_flow_source = lambda **kwargs: None
+ service._fallback_format_tool_result = lambda tool_name, tool_result: "Revisao agendada com sucesso."
+ captured = []
+ service._capture_successful_tool_side_effects = lambda **kwargs: captured.append(kwargs)
+
+ response = await service._try_collect_and_schedule_review(
+ message="ok",
+ user_id=1,
+ extracted_fields={},
+ intents={},
+ turn_decision={"intent": "review_schedule", "domain": "review", "action": "collect_review_schedule"},
+ )
+
+ self.assertEqual(response, "Revisao agendada com sucesso.")
+ self.assertEqual(len(captured), 1)
+ self.assertEqual(captured[0]["tool_name"], "agendar_revisao")
+ self.assertEqual(captured[0]["user_id"], 1)
+ self.assertEqual(captured[0]["tool_result"]["protocolo"], "REV-20260325-084279F3")
+
async def test_handle_message_prioritizes_review_management_over_model_answer_for_reschedule_intent(self):
state = FakeState(
contexts={
@@ -2604,7 +2697,8 @@ class TurnDecisionContractTests(unittest.IsolatedAsyncioTestCase):
user_id=1,
)
- self.assertEqual(response, "devolucao ok")
+ self.assertTrue(response.startswith("devolucao ok"))
+ self.assertIn("Se quiser, posso te enviar esse resumo por e-mail.", response)
self.assertEqual(
service.tool_executor.calls,
[
@@ -2703,7 +2797,8 @@ class TurnDecisionContractTests(unittest.IsolatedAsyncioTestCase):
user_id=1,
)
- self.assertEqual(response, "pagamento ok")
+ self.assertTrue(response.startswith("pagamento ok"))
+ self.assertIn("Se quiser, posso te enviar esse resumo por e-mail.", response)
self.assertEqual(
service.tool_executor.calls,
[
@@ -2809,7 +2904,8 @@ class TurnDecisionContractTests(unittest.IsolatedAsyncioTestCase):
user_id=1,
)
- self.assertEqual(response, "pagamento ok")
+ self.assertTrue(response.startswith("pagamento ok"))
+ self.assertIn("Se quiser, posso te enviar esse resumo por e-mail.", response)
self.assertEqual(
service.tool_executor.calls,
[
@@ -4691,7 +4787,122 @@ class TurnDecisionContractTests(unittest.IsolatedAsyncioTestCase):
self.assertIsNone(response)
-if __name__ == "__main__":
- unittest.main()
+class OrquestradorEmailCaptureTests(unittest.IsolatedAsyncioTestCase):
+ def _build_service(self, state=None):
+ service = OrquestradorService.__new__(OrquestradorService)
+ service.state = state or FakeState()
+ service.normalizer = EntityNormalizer()
+ service._turn_trace = {"request_id": "req-1"}
+ return service
+
+ def test_stage_email_capture_request_and_prompt_for_current_turn(self):
+ service = self._build_service()
+ service._get_saved_user_email = lambda user_id: None
+
+ service._stage_email_capture_request(
+ tool_name="realizar_pedido",
+ tool_result={"numero_pedido": "PED-1"},
+ user_id=7,
+ )
+
+ pending = service.state.get_entry("pending_email_capture_requests", 7)
+ self.assertIsNotNone(pending)
+ self.assertEqual(pending["event_type"], ORDER_CREATED_EVENT)
+ self.assertEqual(pending["payload"]["numero_pedido"], "PED-1")
+ self.assertEqual(pending["payload"]["user_id"], 7)
+
+ response = service._append_email_capture_prompt_if_needed(
+ response="Pedido criado com sucesso.",
+ user_id=7,
+ )
+ self.assertIn("Se quiser, posso te enviar esse resumo por e-mail.", response)
+
+ async def test_pending_email_capture_decline_clears_request(self):
+ state = FakeState(
+ entries={
+ "pending_email_capture_requests": {
+ 7: {
+ "request_id": "req-1",
+ "event_type": ORDER_CREATED_EVENT,
+ "payload": {"numero_pedido": "PED-1", "user_id": 7},
+ "expires_at": utc_now() + timedelta(minutes=15),
+ }
+ }
+ }
+ )
+ service = self._build_service(state=state)
+
+ response = await service._try_handle_pending_email_capture_message(
+ message="prefiro nao informar",
+ user_id=7,
+ )
+
+ self.assertEqual(response, "Tudo bem. Nao vou enviar este resumo por e-mail.")
+ self.assertIsNone(state.get_entry("pending_email_capture_requests", 7))
+
+ def test_ensure_user_email_routes_syncs_global_routes_only_once(self):
+ service = self._build_service()
+ service._get_user_record = lambda user_id: SimpleNamespace(
+ id=user_id,
+ email="cliente@example.com",
+ name="Cliente Teste",
+ )
+
+ with patch(
+ "app.services.orchestration.orquestrador_service.sync_user_email_integration_routes"
+ ) as sync_routes_mock:
+ service._ensure_user_email_routes(user_id=7)
+ service._ensure_user_email_routes(user_id=7)
+
+ sync_routes_mock.assert_called_once_with(
+ user_id=7,
+ recipient_email="cliente@example.com",
+ recipient_name="Cliente Teste",
+ )
+
+ async def test_pending_email_capture_success_saves_email_and_reemits_event(self):
+ state = FakeState(
+ entries={
+ "pending_email_capture_requests": {
+ 7: {
+ "request_id": "req-1",
+ "event_type": ORDER_CREATED_EVENT,
+ "payload": {"numero_pedido": "PED-1", "user_id": 7},
+ "expires_at": utc_now() + timedelta(minutes=15),
+ }
+ }
+ }
+ )
+ service = self._build_service(state=state)
+ saved = {}
+ ensured_routes = []
+
+ def fake_save_user_email(user_id: int | None, email: str | None):
+ saved["user_id"] = user_id
+ saved["email"] = email
+ return SimpleNamespace(id=user_id, email=email, name="Cliente Teste")
+
+ service._save_user_email = fake_save_user_email
+ service._ensure_user_email_routes = lambda user_id: ensured_routes.append(user_id)
+
+ with patch(
+ "app.services.orchestration.orquestrador_service.emit_business_event",
+ new=AsyncMock(return_value=[{"status": "sent", "provider_message_id": "brevo-1"}]),
+ ) as emit_business_event_mock:
+ response = await service._try_handle_pending_email_capture_message(
+ message="cliente@example.com",
+ user_id=7,
+ )
+
+ self.assertEqual(saved, {"user_id": 7, "email": "cliente@example.com"})
+ self.assertEqual(ensured_routes, [7])
+ emit_business_event_mock.assert_awaited_once_with(
+ event_type=ORDER_CREATED_EVENT,
+ payload={"numero_pedido": "PED-1", "user_id": 7},
+ )
+ self.assertIn("enviei este resumo por la", response)
+ self.assertIsNone(state.get_entry("pending_email_capture_requests", 7))
+if __name__ == "__main__":
+ unittest.main()