diff --git a/app/core/settings.py b/app/core/settings.py index f9ecf8e..c9e880a 100644 --- a/app/core/settings.py +++ b/app/core/settings.py @@ -55,6 +55,15 @@ class Settings(BaseSettings): redis_key_prefix: str = "orquestrador" redis_socket_timeout_seconds: int = 5 + # External integrations + integrations_enabled: bool = False + integration_sync_delivery_enabled: bool = True + brevo_api_key: str | None = None + brevo_base_url: str = "https://api.brevo.com/v3" + brevo_sender_email: str | None = None + brevo_sender_name: str = "Orquestrador" + brevo_request_timeout_seconds: int = 10 + @field_validator("debug", mode="before") @classmethod def parse_debug_aliases(cls, value): diff --git a/app/db/bootstrap.py b/app/db/bootstrap.py index 5b7f89b..14785e1 100644 --- a/app/db/bootstrap.py +++ b/app/db/bootstrap.py @@ -10,6 +10,8 @@ from app.db.models import Tool from app.db.mock_models import ( ConversationTurn, Customer, + IntegrationDelivery, + IntegrationRoute, Order, RentalContract, RentalFine, diff --git a/app/db/mock_models.py b/app/db/mock_models.py index fb7b331..7fc6831 100644 --- a/app/db/mock_models.py +++ b/app/db/mock_models.py @@ -157,6 +157,58 @@ class RentalFine(MockBase): created_at = Column(DateTime, server_default=func.current_timestamp()) +class IntegrationRoute(MockBase): + __tablename__ = "integration_routes" + __table_args__ = ( + UniqueConstraint( + "event_type", + "provider", + "recipient_email", + name="uq_integration_routes_target", + ), + ) + + id = Column(Integer, primary_key=True, index=True) + event_type = Column(String(80), nullable=False, index=True) + provider = Column(String(40), nullable=False, index=True) + enabled = Column(Boolean, nullable=False, default=True, index=True) + recipient_email = Column(String(255), nullable=False, index=True) + recipient_name = Column(String(120), nullable=True) + subject_template = Column(Text, nullable=True) + body_template = Column(Text, nullable=True) + provider_config_json = Column(Text, nullable=True) + created_at = Column(DateTime, server_default=func.current_timestamp()) + updated_at = Column( + DateTime, + server_default=func.current_timestamp(), + onupdate=func.current_timestamp(), + ) + + +class IntegrationDelivery(MockBase): + __tablename__ = "integration_deliveries" + + id = Column(Integer, primary_key=True, index=True) + route_id = Column(Integer, ForeignKey("integration_routes.id"), nullable=False, index=True) + event_type = Column(String(80), nullable=False, index=True) + provider = Column(String(40), nullable=False, index=True) + status = Column(String(20), nullable=False, default="pending", index=True) + payload_json = Column(Text, nullable=False) + rendered_subject = Column(Text, nullable=True) + rendered_body = Column(Text, nullable=True) + provider_message_id = Column(String(120), nullable=True, index=True) + idempotency_key = Column(String(64), unique=True, nullable=False, index=True) + attempts = Column(Integer, nullable=False, default=0) + last_error = Column(Text, nullable=True) + dispatched_at = Column(DateTime, nullable=True, index=True) + created_at = Column(DateTime, server_default=func.current_timestamp()) + updated_at = Column( + DateTime, + server_default=func.current_timestamp(), + onupdate=func.current_timestamp(), + ) + + class ConversationTurn(MockBase): __tablename__ = "conversation_turns" diff --git a/app/services/domain/order_service.py b/app/services/domain/order_service.py index 886c381..50c1d52 100644 --- a/app/services/domain/order_service.py +++ b/app/services/domain/order_service.py @@ -10,6 +10,8 @@ from app.db.mock_models import Order, User, Vehicle from app.services.domain.common import is_legacy_schema_issue from app.services.domain.credit_service import validar_cliente_venda from app.services.domain.tool_errors import raise_tool_http_error +from app.services.integrations.events import ORDER_CANCELLED_EVENT, ORDER_CREATED_EVENT +from app.services.integrations.service import publish_business_event_safely from app.services.orchestration.technical_normalizer import normalize_cpf from app.services.user.mock_customer_service import hydrate_mock_customer_from_cpf @@ -193,13 +195,15 @@ async def cancelar_pedido( db.commit() db.refresh(pedido) - return { + result = { "numero_pedido": pedido.numero_pedido, "user_id": pedido.user_id, "status": pedido.status, "motivo": pedido.motivo_cancelamento, "data_cancelamento": pedido.data_cancelamento.isoformat() if pedido.data_cancelamento else None, } + await publish_business_event_safely(ORDER_CANCELLED_EVENT, result) + return result finally: db.close() @@ -326,7 +330,7 @@ async def realizar_pedido( }, ) db.commit() - return { + result = { "numero_pedido": numero_pedido, "user_id": user_id, "cpf": cpf_norm, @@ -337,8 +341,10 @@ async def realizar_pedido( "valor_veiculo": valor_veiculo, "aprovado_credito": True, } + await publish_business_event_safely(ORDER_CREATED_EVENT, result) + return result - return { + result = { "numero_pedido": pedido.numero_pedido, "user_id": pedido.user_id, "cpf": pedido.cpf, @@ -349,6 +355,8 @@ async def realizar_pedido( "valor_veiculo": pedido.valor_veiculo, "aprovado_credito": True, } + await publish_business_event_safely(ORDER_CREATED_EVENT, result) + return result finally: _release_vehicle_reservation_lock(db, reservation_lock_name) db.close() diff --git a/app/services/domain/rental_service.py b/app/services/domain/rental_service.py index dc9b6cd..1562dcd 100644 --- a/app/services/domain/rental_service.py +++ b/app/services/domain/rental_service.py @@ -9,6 +9,12 @@ from app.core.time_utils import utc_now from app.db.mock_database import SessionMockLocal from app.db.mock_models import RentalContract, RentalFine, RentalPayment, RentalVehicle, User from app.services.domain.tool_errors import raise_tool_http_error +from app.services.integrations.events import ( + RENTAL_OPENED_EVENT, + RENTAL_PAYMENT_REGISTERED_EVENT, + RENTAL_RETURN_REGISTERED_EVENT, +) +from app.services.integrations.service import publish_business_event_safely from app.services.orchestration import technical_normalizer from app.services.user.mock_customer_service import hydrate_mock_customer_from_cpf @@ -389,7 +395,7 @@ async def abrir_locacao_aluguel( db.refresh(contract) db.refresh(vehicle) - return { + result = { "contrato_numero": contract.contrato_numero, "placa": contract.placa, "modelo_veiculo": contract.modelo_veiculo, @@ -403,6 +409,8 @@ async def abrir_locacao_aluguel( "cpf": contract.cpf, "nome_cliente": _normalize_text_field(nome_cliente), } + await publish_business_event_safely(RENTAL_OPENED_EVENT, result) + return result finally: db.close() @@ -458,7 +466,7 @@ async def registrar_devolucao_aluguel( if vehicle is not None: db.refresh(vehicle) - return { + result = { "contrato_numero": contract.contrato_numero, "placa": contract.placa, "modelo_veiculo": contract.modelo_veiculo, @@ -468,6 +476,8 @@ async def registrar_devolucao_aluguel( "status": contract.status, "status_veiculo": vehicle.status if vehicle is not None else None, } + await publish_business_event_safely(RENTAL_RETURN_REGISTERED_EVENT, result) + return result finally: db.close() @@ -529,7 +539,7 @@ async def registrar_pagamento_aluguel( db.add(record) db.commit() db.refresh(record) - return { + result = { "protocolo": record.protocolo, "rental_contract_id": record.rental_contract_id, "contrato_numero": record.contrato_numero, @@ -540,6 +550,8 @@ async def registrar_pagamento_aluguel( "identificador_comprovante": record.identificador_comprovante, "status": "registrado", } + await publish_business_event_safely(RENTAL_PAYMENT_REGISTERED_EVENT, result) + return result finally: db.close() diff --git a/app/services/domain/review_service.py b/app/services/domain/review_service.py index ef0a210..d80cf72 100644 --- a/app/services/domain/review_service.py +++ b/app/services/domain/review_service.py @@ -12,6 +12,8 @@ from app.db.mock_database import SessionMockLocal from app.db.mock_models import ReviewSchedule from app.services.domain.common import parse_bool from app.services.domain.tool_errors import build_tool_error, raise_tool_http_error +from app.services.integrations.events import REVIEW_SCHEDULED_EVENT +from app.services.integrations.service import publish_business_event_safely # Responsabilidade: tudo que é regra de revisão. @@ -313,7 +315,7 @@ async def agendar_revisao( db.commit() db.refresh(agendamento) - return { + result = { "protocolo": agendamento.protocolo, "user_id": agendamento.user_id, "placa": agendamento.placa, @@ -325,6 +327,8 @@ async def agendar_revisao( "revisao_previa_concessionaria": revisao_previa, "valor_revisao": valor_revisao, } + await publish_business_event_safely(REVIEW_SCHEDULED_EVENT, result) + return result finally: _release_review_slot_lock(db, review_slot_lock_name) db.close() diff --git a/app/services/integrations/__init__.py b/app/services/integrations/__init__.py new file mode 100644 index 0000000..ad1530a --- /dev/null +++ b/app/services/integrations/__init__.py @@ -0,0 +1,31 @@ +from app.services.integrations.events import ( + ORDER_CANCELLED_EVENT, + ORDER_CREATED_EVENT, + RENTAL_OPENED_EVENT, + RENTAL_PAYMENT_REGISTERED_EVENT, + RENTAL_RETURN_REGISTERED_EVENT, + REVIEW_SCHEDULED_EVENT, + SUPPORTED_EVENT_TYPES, +) +from app.services.integrations.service import ( + emit_business_event, + list_integration_routes, + process_pending_deliveries, + publish_business_event_safely, + upsert_email_integration_route, +) + +__all__ = [ + "ORDER_CANCELLED_EVENT", + "ORDER_CREATED_EVENT", + "RENTAL_OPENED_EVENT", + "RENTAL_PAYMENT_REGISTERED_EVENT", + "RENTAL_RETURN_REGISTERED_EVENT", + "REVIEW_SCHEDULED_EVENT", + "SUPPORTED_EVENT_TYPES", + "emit_business_event", + "list_integration_routes", + "process_pending_deliveries", + "publish_business_event_safely", + "upsert_email_integration_route", +] diff --git a/app/services/integrations/events.py b/app/services/integrations/events.py new file mode 100644 index 0000000..e3a7261 --- /dev/null +++ b/app/services/integrations/events.py @@ -0,0 +1,15 @@ +ORDER_CREATED_EVENT = "order.created" +ORDER_CANCELLED_EVENT = "order.cancelled" +REVIEW_SCHEDULED_EVENT = "review.scheduled" +RENTAL_OPENED_EVENT = "rental.opened" +RENTAL_PAYMENT_REGISTERED_EVENT = "rental.payment_registered" +RENTAL_RETURN_REGISTERED_EVENT = "rental.return_registered" + +SUPPORTED_EVENT_TYPES = ( + ORDER_CREATED_EVENT, + ORDER_CANCELLED_EVENT, + REVIEW_SCHEDULED_EVENT, + RENTAL_OPENED_EVENT, + RENTAL_PAYMENT_REGISTERED_EVENT, + RENTAL_RETURN_REGISTERED_EVENT, +) diff --git a/app/services/integrations/providers.py b/app/services/integrations/providers.py new file mode 100644 index 0000000..8dd0f51 --- /dev/null +++ b/app/services/integrations/providers.py @@ -0,0 +1,84 @@ +import httpx + +from app.core.settings import settings + + +class IntegrationProviderError(RuntimeError): + """Erro de transporte ou configuracao em providers externos.""" + + +class BrevoEmailProvider: + provider_name = "brevo_email" + + def __init__(self) -> None: + self.base_url = str(settings.brevo_base_url or "https://api.brevo.com/v3").rstrip("/") + self.api_key = str(settings.brevo_api_key or "").strip() + self.sender_email = str(settings.brevo_sender_email or "").strip() + self.sender_name = str(settings.brevo_sender_name or "Orquestrador").strip() or "Orquestrador" + self.timeout_seconds = max(1, int(settings.brevo_request_timeout_seconds or 10)) + + def is_configured(self) -> bool: + return bool(self.api_key and self.sender_email) + + async def send_email( + self, + *, + to_email: str, + to_name: str | None, + subject: str, + body: str, + tags: list[str] | None = None, + ) -> dict: + if not self.is_configured(): + raise IntegrationProviderError( + "Brevo nao configurado. Defina BREVO_API_KEY e BREVO_SENDER_EMAIL para enviar emails." + ) + + payload = { + "sender": { + "email": self.sender_email, + "name": self.sender_name, + }, + "to": [ + { + "email": str(to_email or "").strip(), + **({"name": str(to_name).strip()} if str(to_name or "").strip() else {}), + } + ], + "subject": str(subject or "").strip(), + "textContent": str(body or "").strip(), + } + if tags: + payload["tags"] = [str(tag).strip() for tag in tags if str(tag).strip()] + + headers = { + "accept": "application/json", + "content-type": "application/json", + "api-key": self.api_key, + } + + try: + async with httpx.AsyncClient(timeout=self.timeout_seconds) as client: + response = await client.post( + f"{self.base_url}/smtp/email", + headers=headers, + json=payload, + ) + except httpx.HTTPError as exc: + raise IntegrationProviderError(f"Falha ao enviar email via Brevo: {exc}") from exc + + if response.status_code >= 400: + raise IntegrationProviderError( + f"Brevo retornou erro {response.status_code}: {response.text[:300]}" + ) + + try: + data = response.json() + except ValueError: + data = {} + + return { + "provider": self.provider_name, + "message_id": data.get("messageId"), + "response": data, + } diff --git a/app/services/integrations/service.py b/app/services/integrations/service.py new file mode 100644 index 0000000..d10112a --- /dev/null +++ b/app/services/integrations/service.py @@ -0,0 +1,327 @@ +import hashlib +import json +import logging +from typing import Any + +from sqlalchemy import or_ + +from app.core.settings import settings +from app.core.time_utils import utc_now +from app.db.mock_database import SessionMockLocal +from app.db.mock_models import IntegrationDelivery, IntegrationRoute +from app.services.integrations.events import SUPPORTED_EVENT_TYPES +from app.services.integrations.providers import BrevoEmailProvider, IntegrationProviderError +from app.services.integrations.templates import render_email_content + + +logger = logging.getLogger(__name__) + +_PROVIDER_FACTORIES = { + "brevo_email": BrevoEmailProvider, +} + + +def _clean_text(value: str | None) -> str | None: + text = str(value or "").strip() + return text or None + + +def _serialize_json(value: dict[str, Any] | None) -> str: + return json.dumps(value or {}, ensure_ascii=True, sort_keys=True, separators=(",", ":"), default=str) + + +def _deserialize_json(value: str | None) -> dict[str, Any]: + text = str(value or "").strip() + if not text: + return {} + try: + payload = json.loads(text) + except (TypeError, ValueError): + return {} + return payload if isinstance(payload, dict) else {} + + +def _validate_event_type(event_type: str) -> str: + normalized = _clean_text(event_type) + if normalized not in SUPPORTED_EVENT_TYPES: + raise ValueError(f"unsupported integration event: {event_type}") + return normalized + + +def _build_idempotency_key(*, route_id: int, event_type: str, payload: dict[str, Any]) -> str: + raw = f"{route_id}:{event_type}:{_serialize_json(payload)}" + return hashlib.sha256(raw.encode("utf-8")).hexdigest() + + +def _serialize_route(route: IntegrationRoute) -> dict[str, Any]: + return { + "id": route.id, + "event_type": route.event_type, + "provider": route.provider, + "enabled": bool(route.enabled), + "recipient_email": route.recipient_email, + "recipient_name": route.recipient_name, + "subject_template": route.subject_template, + "body_template": route.body_template, + "provider_config": _deserialize_json(route.provider_config_json), + } + + +def _serialize_delivery(delivery: IntegrationDelivery) -> dict[str, Any]: + return { + "id": delivery.id, + "route_id": delivery.route_id, + "event_type": delivery.event_type, + "provider": delivery.provider, + "status": delivery.status, + "attempts": int(delivery.attempts or 0), + "payload": _deserialize_json(delivery.payload_json), + "rendered_subject": delivery.rendered_subject, + "rendered_body": delivery.rendered_body, + "provider_message_id": delivery.provider_message_id, + "last_error": delivery.last_error, + "idempotency_key": delivery.idempotency_key, + "dispatched_at": delivery.dispatched_at.isoformat() if delivery.dispatched_at else None, + "created_at": delivery.created_at.isoformat() if delivery.created_at else None, + } + + +def _get_provider(provider_name: str): + factory = _PROVIDER_FACTORIES.get(_clean_text(provider_name) or "") + if not factory: + raise IntegrationProviderError(f"Provider de integracao nao suportado: {provider_name}") + return factory() + + +def list_integration_routes(*, event_type: str | None = None, provider: str | None = None) -> list[dict[str, Any]]: + db = SessionMockLocal() + try: + query = db.query(IntegrationRoute) + normalized_event_type = _clean_text(event_type) + if normalized_event_type: + query = query.filter(IntegrationRoute.event_type == normalized_event_type) + normalized_provider = _clean_text(provider) + if normalized_provider: + query = query.filter(IntegrationRoute.provider == normalized_provider) + routes = query.order_by(IntegrationRoute.event_type.asc(), IntegrationRoute.id.asc()).all() + return [_serialize_route(route) for route in routes] + finally: + db.close() + + +def upsert_email_integration_route( + *, + event_type: str, + recipient_email: str, + recipient_name: str | None = None, + subject_template: str | None = None, + body_template: str | None = None, + enabled: bool = True, + provider: str = "brevo_email", + provider_config: dict[str, Any] | None = None, +) -> dict[str, Any]: + normalized_event_type = _validate_event_type(event_type) + normalized_provider = _clean_text(provider) or "brevo_email" + normalized_email = _clean_text(recipient_email) + if not normalized_email: + raise ValueError("recipient_email is required") + + db = SessionMockLocal() + try: + route = ( + db.query(IntegrationRoute) + .filter(IntegrationRoute.event_type == normalized_event_type) + .filter(IntegrationRoute.provider == normalized_provider) + .filter(IntegrationRoute.recipient_email == normalized_email) + .first() + ) + if route is None: + route = IntegrationRoute( + event_type=normalized_event_type, + provider=normalized_provider, + recipient_email=normalized_email, + ) + db.add(route) + + route.enabled = bool(enabled) + route.recipient_name = _clean_text(recipient_name) + route.subject_template = _clean_text(subject_template) + route.body_template = _clean_text(body_template) + route.provider_config_json = _serialize_json(provider_config) + db.commit() + db.refresh(route) + return _serialize_route(route) + finally: + db.close() + + +async def emit_business_event(event_type: str, payload: dict[str, Any] | None) -> list[dict[str, Any]]: + if not settings.integrations_enabled: + return [] + + normalized_event_type = _validate_event_type(event_type) + normalized_payload = dict(payload or {}) + created_ids: list[int] = [] + + db = SessionMockLocal() + try: + routes = ( + db.query(IntegrationRoute) + .filter(IntegrationRoute.event_type == normalized_event_type) + .filter(IntegrationRoute.enabled.is_(True)) + .all() + ) + for route in routes: + idempotency_key = _build_idempotency_key( + route_id=route.id, + event_type=normalized_event_type, + payload=normalized_payload, + ) + existing = ( + db.query(IntegrationDelivery) + .filter(IntegrationDelivery.idempotency_key == idempotency_key) + .first() + ) + if existing is not None: + created_ids.append(existing.id) + continue + + delivery = IntegrationDelivery( + route_id=route.id, + event_type=normalized_event_type, + provider=route.provider, + status="pending", + payload_json=_serialize_json(normalized_payload), + idempotency_key=idempotency_key, + ) + db.add(delivery) + db.flush() + created_ids.append(delivery.id) + + db.commit() + finally: + db.close() + + if settings.integration_sync_delivery_enabled and created_ids: + dispatched: list[dict[str, Any]] = [] + for delivery_id in created_ids: + delivered = await dispatch_delivery(delivery_id) + if delivered is not None: + dispatched.append(delivered) + return dispatched + + if not created_ids: + return [] + + db = SessionMockLocal() + try: + rows = ( + db.query(IntegrationDelivery) + .filter(IntegrationDelivery.id.in_(created_ids)) + .order_by(IntegrationDelivery.id.asc()) + .all() + ) + return [_serialize_delivery(row) for row in rows] + finally: + db.close() + + +async def dispatch_delivery(delivery_id: int) -> dict[str, Any] | None: + db = SessionMockLocal() + try: + delivery = db.query(IntegrationDelivery).filter(IntegrationDelivery.id == delivery_id).first() + if delivery is None: + return None + if delivery.status == "sent": + return _serialize_delivery(delivery) + + route = db.query(IntegrationRoute).filter(IntegrationRoute.id == delivery.route_id).first() + if route is None or not route.enabled: + delivery.status = "skipped" + delivery.last_error = "Route disabled or not found." + delivery.attempts = int(delivery.attempts or 0) + 1 + db.commit() + db.refresh(delivery) + return _serialize_delivery(delivery) + + payload = _deserialize_json(delivery.payload_json) + subject, body = render_email_content( + event_type=delivery.event_type, + payload=payload, + subject_template=route.subject_template, + body_template=route.body_template, + ) + delivery.rendered_subject = subject + delivery.rendered_body = body + delivery.attempts = int(delivery.attempts or 0) + 1 + + try: + provider = _get_provider(route.provider) + result = await provider.send_email( + to_email=route.recipient_email, + to_name=route.recipient_name, + subject=subject, + body=body, + tags=[delivery.event_type], + ) + except IntegrationProviderError as exc: + delivery.status = "failed" + delivery.last_error = str(exc) + db.commit() + db.refresh(delivery) + return _serialize_delivery(delivery) + + delivery.status = "sent" + delivery.last_error = None + delivery.provider_message_id = _clean_text(result.get("message_id")) + delivery.dispatched_at = utc_now() + db.commit() + db.refresh(delivery) + return _serialize_delivery(delivery) + finally: + db.close() + + +async def process_pending_deliveries( + *, + limit: int = 20, + delivery_ids: list[int] | None = None, +) -> list[dict[str, Any]]: + db = SessionMockLocal() + try: + query = db.query(IntegrationDelivery) + if delivery_ids: + query = query.filter(IntegrationDelivery.id.in_(delivery_ids)) + else: + query = query.filter( + or_( + IntegrationDelivery.status == "pending", + IntegrationDelivery.status == "failed", + ) + ) + query = query.order_by(IntegrationDelivery.id.asc()) + if limit and int(limit) > 0: + query = query.limit(max(1, int(limit))) + if delivery_ids: + query = query.order_by(IntegrationDelivery.id.asc()) + selected_ids = [row.id for row in query.all()] + finally: + db.close() + + results: list[dict[str, Any]] = [] + for delivery_id in selected_ids: + dispatched = await dispatch_delivery(delivery_id) + if dispatched is not None: + results.append(dispatched) + return results + + +async def publish_business_event_safely(event_type: str, payload: dict[str, Any] | None) -> list[dict[str, Any]]: + try: + return await emit_business_event(event_type, payload) + except Exception: + logger.exception( + "Falha ao publicar evento de integracao.", + extra={"event_type": event_type}, + ) + return [] diff --git a/app/services/integrations/templates.py b/app/services/integrations/templates.py new file mode 100644 index 0000000..2a0f29e --- /dev/null +++ b/app/services/integrations/templates.py @@ -0,0 +1,128 @@ +import json +from collections.abc import Mapping +from typing import Any + +from app.services.integrations.events import ( + ORDER_CANCELLED_EVENT, + ORDER_CREATED_EVENT, + RENTAL_OPENED_EVENT, + RENTAL_PAYMENT_REGISTERED_EVENT, + RENTAL_RETURN_REGISTERED_EVENT, + REVIEW_SCHEDULED_EVENT, +) + + +DEFAULT_EMAIL_TEMPLATES = { + ORDER_CREATED_EVENT: { + "subject": "[Orquestrador] Pedido criado {numero_pedido}", + "body": """Um pedido de veiculo foi criado com sucesso. + +Numero: {numero_pedido} +Veiculo: {modelo_veiculo} +Valor: R$ {valor_veiculo} +CPF: {cpf} +Status: {status} +Status do veiculo: {status_veiculo}""", + }, + ORDER_CANCELLED_EVENT: { + "subject": "[Orquestrador] Pedido cancelado {numero_pedido}", + "body": """Um pedido foi cancelado. + +Numero: {numero_pedido} +Status: {status} +Motivo: {motivo} +Data cancelamento: {data_cancelamento}""", + }, + REVIEW_SCHEDULED_EVENT: { + "subject": "[Orquestrador] Revisao agendada {protocolo}", + "body": """Uma revisao foi agendada. + +Protocolo: {protocolo} +Placa: {placa} +Data e hora: {data_hora} +Modelo: {modelo} +Ano: {ano} +KM: {km} +Valor estimado: R$ {valor_revisao}""", + }, + RENTAL_OPENED_EVENT: { + "subject": "[Orquestrador] Locacao aberta {contrato_numero}", + "body": """Uma locacao foi aberta. + +Contrato: {contrato_numero} +Veiculo: {modelo_veiculo} +Placa: {placa} +Inicio: {data_inicio} +Devolucao prevista: {data_fim_prevista} +Diaria: R$ {valor_diaria} +Valor previsto: R$ {valor_previsto} +Status: {status}""", + }, + RENTAL_PAYMENT_REGISTERED_EVENT: { + "subject": "[Orquestrador] Pagamento de aluguel {protocolo}", + "body": """Um pagamento de aluguel foi registrado. + +Protocolo: {protocolo} +Contrato: {contrato_numero} +Placa: {placa} +Valor: R$ {valor} +Data pagamento: {data_pagamento} +Favorecido: {favorecido}""", + }, + RENTAL_RETURN_REGISTERED_EVENT: { + "subject": "[Orquestrador] Devolucao registrada {contrato_numero}", + "body": """Uma devolucao de locacao foi registrada. + +Contrato: {contrato_numero} +Veiculo: {modelo_veiculo} +Placa: {placa} +Data devolucao: {data_devolucao} +Valor previsto: R$ {valor_previsto} +Valor final: R$ {valor_final} +Status: {status}""", + }, +} + + +class _SafeTemplateData(dict): + def __missing__(self, key: str) -> str: + return "" + + +def _normalize_template_value(value: Any) -> str: + if value is None: + return "" + if isinstance(value, bool): + return "sim" if value else "nao" + if isinstance(value, (list, tuple, dict)): + return json.dumps(value, ensure_ascii=True, default=str, sort_keys=True) + return str(value) + + +def build_template_context(payload: Mapping[str, Any] | None) -> dict[str, str]: + context = _SafeTemplateData() + for key, value in dict(payload or {}).items(): + context[str(key)] = _normalize_template_value(value) + return context + + +def render_email_content( + *, + event_type: str, + payload: Mapping[str, Any] | None, + subject_template: str | None = None, + body_template: str | None = None, +) -> tuple[str, str]: + defaults = DEFAULT_EMAIL_TEMPLATES.get( + event_type, + { + "subject": "[Orquestrador] Evento {event_type}", + "body": "Payload do evento {event_type}:\n{payload_json}", + }, + ) + context = build_template_context(payload) + context.setdefault("event_type", event_type) + context.setdefault("payload_json", _normalize_template_value(payload)) + subject = str(subject_template or defaults["subject"]).format_map(context).strip() + body = str(body_template or defaults["body"]).format_map(context).strip() + return subject, body diff --git a/scripts/process_integration_deliveries.py b/scripts/process_integration_deliveries.py new file mode 100644 index 0000000..d349625 --- /dev/null +++ b/scripts/process_integration_deliveries.py @@ -0,0 +1,21 @@ +import argparse +import asyncio +import json + +from app.services.integrations.service import process_pending_deliveries + + +async def _main_async(limit: int) -> None: + deliveries = await process_pending_deliveries(limit=limit) + print(json.dumps(deliveries, ensure_ascii=True, indent=2, sort_keys=True)) + + +def main() -> None: + parser = argparse.ArgumentParser(description="Processa entregas pendentes do outbox de integracoes.") + parser.add_argument("--limit", type=int, default=20) + args = parser.parse_args() + asyncio.run(_main_async(limit=args.limit)) + + +if __name__ == "__main__": + main() diff --git a/scripts/upsert_integration_route.py b/scripts/upsert_integration_route.py new file mode 100644 index 0000000..d0b5626 --- /dev/null +++ b/scripts/upsert_integration_route.py @@ -0,0 +1,30 @@ +import argparse +import json + +from app.services.integrations.events import SUPPORTED_EVENT_TYPES +from app.services.integrations.service import upsert_email_integration_route + + +def main() -> None: + parser = argparse.ArgumentParser(description="Cria ou atualiza uma rota de integracao por email.") + parser.add_argument("--event", required=True, choices=SUPPORTED_EVENT_TYPES) + parser.add_argument("--recipient", required=True) + parser.add_argument("--name") + parser.add_argument("--subject-template") + parser.add_argument("--body-template") + parser.add_argument("--disabled", action="store_true") + args = parser.parse_args() + + route = upsert_email_integration_route( + event_type=args.event, + recipient_email=args.recipient, + recipient_name=args.name, + subject_template=args.subject_template, + body_template=args.body_template, + enabled=not args.disabled, + ) + print(json.dumps(route, ensure_ascii=True, indent=2, sort_keys=True)) + + +if __name__ == "__main__": + main() diff --git a/tests/test_integration_domain_hooks.py b/tests/test_integration_domain_hooks.py new file mode 100644 index 0000000..17837cd --- /dev/null +++ b/tests/test_integration_domain_hooks.py @@ -0,0 +1,93 @@ +import os +import unittest +from unittest.mock import AsyncMock, patch + +os.environ.setdefault("DEBUG", "false") + +from app.db.mock_models import Vehicle +from app.services.domain import order_service, rental_service, review_service +from tests.test_order_service import FakeSession +from tests.test_rental_service import RentalServiceTests +from tests.test_review_service import ReviewLockingSession + + +class IntegrationDomainHookTests(unittest.IsolatedAsyncioTestCase): + async def test_realizar_pedido_publica_evento_apos_sucesso(self): + vehicle = Vehicle(id=8, modelo="Toyota Corolla 2024", categoria="suv", preco=76087.0) + session = FakeSession(vehicle=vehicle) + fake_publish = AsyncMock(return_value=[]) + + async def fake_hydrate_mock_customer_from_cpf(cpf: str, user_id: int | None = None): + return {"cpf": cpf, "user_id": user_id} + + async def fake_validar_cliente_venda(cpf: str, valor_veiculo: float): + return {"aprovado": True} + + with patch.object(order_service, "SessionMockLocal", return_value=session), patch.object( + order_service, + "hydrate_mock_customer_from_cpf", + new=fake_hydrate_mock_customer_from_cpf, + ), patch.object( + order_service, + "validar_cliente_venda", + new=fake_validar_cliente_venda, + ), patch.object(order_service, "_get_vehicle_for_update", return_value=vehicle), patch.object( + order_service, + "_get_active_order_for_vehicle", + return_value=None, + ), patch.object(order_service, "publish_business_event_safely", fake_publish): + await order_service.realizar_pedido(cpf="123.456.789-09", vehicle_id=8) + + fake_publish.assert_awaited_once() + self.assertEqual(fake_publish.await_args.args[0], order_service.ORDER_CREATED_EVENT) + + async def test_agendar_revisao_publica_evento_apos_sucesso(self): + session = ReviewLockingSession(query_results=[None, None]) + fake_publish = AsyncMock(return_value=[]) + + with patch.object(review_service, "SessionMockLocal", return_value=session), patch.object( + review_service, + "publish_business_event_safely", + fake_publish, + ): + await review_service.agendar_revisao( + placa="ABC1234", + data_hora="18/03/2026 09:00", + modelo="Onix", + ano=2022, + km=15000, + revisao_previa_concessionaria=False, + user_id=7, + ) + + fake_publish.assert_awaited_once() + self.assertEqual(fake_publish.await_args.args[0], review_service.REVIEW_SCHEDULED_EVENT) + + async def test_abrir_locacao_publica_evento_apos_sucesso(self): + helper = RentalServiceTests() + SessionLocal = helper._build_session_local() + db = SessionLocal() + try: + vehicle = helper._create_rental_vehicle(db) + vehicle_placa = vehicle.placa + finally: + db.close() + + fake_publish = AsyncMock(return_value=[]) + with patch("app.services.domain.rental_service.SessionMockLocal", SessionLocal), patch.object( + rental_service, + "publish_business_event_safely", + fake_publish, + ): + await rental_service.abrir_locacao_aluguel( + placa=vehicle_placa, + data_inicio="17/03/2026 10:00", + data_fim_prevista="20/03/2026 10:00", + ) + + fake_publish.assert_awaited_once() + self.assertEqual(fake_publish.await_args.args[0], rental_service.RENTAL_OPENED_EVENT) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_integration_service.py b/tests/test_integration_service.py new file mode 100644 index 0000000..fe4706a --- /dev/null +++ b/tests/test_integration_service.py @@ -0,0 +1,136 @@ +import os +import unittest +from unittest.mock import AsyncMock, patch + +from sqlalchemy import create_engine +from sqlalchemy.orm import sessionmaker +from sqlalchemy.pool import StaticPool + +os.environ.setdefault("DEBUG", "false") + +from app.db.mock_database import MockBase +from app.db.mock_models import IntegrationDelivery +from app.services.integrations.events import ORDER_CREATED_EVENT +from app.services.integrations import service as integration_service + + +class IntegrationServiceTests(unittest.IsolatedAsyncioTestCase): + def _build_session_local(self): + engine = create_engine( + "sqlite://", + connect_args={"check_same_thread": False}, + poolclass=StaticPool, + ) + SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) + MockBase.metadata.create_all(bind=engine) + self.addCleanup(engine.dispose) + return SessionLocal + + async def test_emit_business_event_creates_and_dispatches_delivery(self): + SessionLocal = self._build_session_local() + + with patch.object(integration_service, "SessionMockLocal", SessionLocal), patch.object( + integration_service.settings, + "integrations_enabled", + True, + ), patch.object( + integration_service.settings, + "integration_sync_delivery_enabled", + True, + ), patch.object( + integration_service, + "_get_provider", + return_value=type( + "FakeProvider", + (), + { + "send_email": AsyncMock(return_value={"message_id": "brevo-123"}), + }, + )(), + ): + integration_service.upsert_email_integration_route( + event_type=ORDER_CREATED_EVENT, + recipient_email="ops@example.com", + ) + deliveries = await integration_service.emit_business_event( + ORDER_CREATED_EVENT, + { + "numero_pedido": "PED-1", + "modelo_veiculo": "Fiat Argo 2024", + "valor_veiculo": 67739.0, + "status": "Ativo", + "status_veiculo": "Reservado", + }, + ) + + self.assertEqual(len(deliveries), 1) + self.assertEqual(deliveries[0]["status"], "sent") + self.assertEqual(deliveries[0]["provider_message_id"], "brevo-123") + + db = SessionLocal() + try: + stored = db.query(IntegrationDelivery).one() + self.assertEqual(stored.status, "sent") + self.assertEqual(stored.provider_message_id, "brevo-123") + finally: + db.close() + + async def test_emit_business_event_deduplicates_by_route_and_payload(self): + SessionLocal = self._build_session_local() + + with patch.object(integration_service, "SessionMockLocal", SessionLocal), patch.object( + integration_service.settings, + "integrations_enabled", + True, + ), patch.object( + integration_service.settings, + "integration_sync_delivery_enabled", + False, + ): + integration_service.upsert_email_integration_route( + event_type=ORDER_CREATED_EVENT, + recipient_email="ops@example.com", + ) + await integration_service.emit_business_event(ORDER_CREATED_EVENT, {"numero_pedido": "PED-1"}) + await integration_service.emit_business_event(ORDER_CREATED_EVENT, {"numero_pedido": "PED-1"}) + + db = SessionLocal() + try: + rows = db.query(IntegrationDelivery).all() + self.assertEqual(len(rows), 1) + self.assertEqual(rows[0].status, "pending") + finally: + db.close() + + async def test_process_pending_deliveries_marks_failure_when_provider_fails(self): + SessionLocal = self._build_session_local() + + with patch.object(integration_service, "SessionMockLocal", SessionLocal), patch.object( + integration_service.settings, + "integrations_enabled", + True, + ), patch.object( + integration_service.settings, + "integration_sync_delivery_enabled", + False, + ): + integration_service.upsert_email_integration_route( + event_type=ORDER_CREATED_EVENT, + recipient_email="ops@example.com", + ) + await integration_service.emit_business_event(ORDER_CREATED_EVENT, {"numero_pedido": "PED-1"}) + + with patch.object(integration_service, "SessionMockLocal", SessionLocal), patch.object( + integration_service, + "_get_provider", + side_effect=integration_service.IntegrationProviderError("brevo offline"), + ): + deliveries = await integration_service.process_pending_deliveries(limit=10) + + self.assertEqual(len(deliveries), 1) + self.assertEqual(deliveries[0]["status"], "failed") + self.assertIn("brevo offline", deliveries[0]["last_error"]) + + +if __name__ == "__main__": + unittest.main()