You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
orquestrador/app/services/integrations/service.py

601 lines
21 KiB
Python

import hashlib
import json
import logging
from typing import Any
from app.core.settings import settings
from app.core.time_utils import utc_now
from app.db.mock_database import SessionMockLocal
from app.db.mock_models import IntegrationDelivery, IntegrationRoute
from app.repositories.user_repository import UserRepository
from app.services.integrations.events import (
ORDER_CANCELLED_EVENT,
ORDER_CREATED_EVENT,
RENTAL_OPENED_EVENT,
RENTAL_PAYMENT_REGISTERED_EVENT,
RENTAL_RETURN_REGISTERED_EVENT,
REVIEW_SCHEDULED_EVENT,
SUPPORTED_EVENT_TYPES,
)
from app.services.integrations.providers import BrevoEmailProvider, IntegrationProviderError
from app.services.integrations.templates import render_email_content
logger = logging.getLogger(__name__)
_PROVIDER_FACTORIES = {
"brevo_email": BrevoEmailProvider,
}
SUPPORTED_DELIVERY_STATUSES = (
"pending",
"failed",
"sent",
"skipped",
)
USER_PROFILE_ROUTE_SCOPE = "user_profile"
USER_PROFILE_EVENT_TYPES = (
ORDER_CREATED_EVENT,
ORDER_CANCELLED_EVENT,
REVIEW_SCHEDULED_EVENT,
RENTAL_OPENED_EVENT,
RENTAL_PAYMENT_REGISTERED_EVENT,
RENTAL_RETURN_REGISTERED_EVENT,
)
DYNAMIC_USER_ROUTE_EMAIL = "dynamic:user_profile"
DYNAMIC_USER_ROUTE_NAME = "Usuario do fluxo"
def _clean_text(value: str | None) -> str | None:
text = str(value or "").strip()
return text or None
def _serialize_json(value: dict[str, Any] | None) -> str:
return json.dumps(value or {}, ensure_ascii=True, sort_keys=True, separators=(",", ":"), default=str)
def _deserialize_json(value: str | None) -> dict[str, Any]:
text = str(value or "").strip()
if not text:
return {}
try:
payload = json.loads(text)
except (TypeError, ValueError):
return {}
return payload if isinstance(payload, dict) else {}
def _validate_event_type(event_type: str) -> str:
normalized = _clean_text(event_type)
if normalized not in SUPPORTED_EVENT_TYPES:
raise ValueError(f"unsupported integration event: {event_type}")
return normalized
def _normalize_int(value: Any) -> int | None:
if value is None:
return None
text = str(value).strip()
if not text:
return None
try:
return int(text)
except (TypeError, ValueError):
return None
def _normalize_delivery_statuses(statuses: str | list[str] | tuple[str, ...] | None) -> list[str] | None:
if statuses is None:
return None
if isinstance(statuses, str):
candidates = [statuses]
else:
candidates = list(statuses)
normalized_statuses: list[str] = []
for status in candidates:
normalized = _clean_text(status)
if not normalized:
continue
if normalized not in SUPPORTED_DELIVERY_STATUSES:
raise ValueError(f"unsupported integration delivery status: {status}")
if normalized not in normalized_statuses:
normalized_statuses.append(normalized)
return normalized_statuses or None
def _build_idempotency_key(*, route_id: int, event_type: str, payload: dict[str, Any]) -> str:
raw = f"{route_id}:{event_type}:{_serialize_json(payload)}"
return hashlib.sha256(raw.encode("utf-8")).hexdigest()
def _recipient_scope(provider_config: dict[str, Any] | None) -> str:
scope = str((provider_config or {}).get("recipient_scope") or "").strip().lower()
return scope or "fixed"
def _serialize_route(route: IntegrationRoute) -> dict[str, Any]:
provider_config = _deserialize_json(route.provider_config_json)
return {
"id": route.id,
"event_type": route.event_type,
"provider": route.provider,
"enabled": bool(route.enabled),
"recipient_email": route.recipient_email,
"recipient_name": route.recipient_name,
"recipient_scope": _recipient_scope(provider_config),
"subject_template": route.subject_template,
"body_template": route.body_template,
"provider_config": provider_config,
"created_at": route.created_at.isoformat() if route.created_at else None,
"updated_at": route.updated_at.isoformat() if route.updated_at else None,
}
def _is_user_profile_route_config(provider_config: dict[str, Any] | None) -> bool:
return _recipient_scope(provider_config) == USER_PROFILE_ROUTE_SCOPE
def _is_legacy_user_profile_route_config(provider_config: dict[str, Any] | None) -> bool:
return _is_user_profile_route_config(provider_config) and _normalize_int((provider_config or {}).get("user_id")) is not None
def _resolve_user_profile_recipient(
db,
*,
payload: dict[str, Any],
route: IntegrationRoute,
) -> dict[str, str] | None:
user_id = _normalize_int(payload.get("user_id"))
if user_id is None:
return None
user = UserRepository(db).get_by_id(user_id=user_id)
if user is None:
return None
email = _clean_text(getattr(user, "email", None))
if not email:
return None
recipient = {"email": email}
recipient_name = _clean_text(getattr(user, "name", None)) or _clean_text(route.recipient_name)
if recipient_name:
recipient["name"] = recipient_name
return recipient
def _resolve_route_recipient(
db,
*,
route: IntegrationRoute,
payload: dict[str, Any],
provider_config: dict[str, Any] | None = None,
) -> dict[str, str] | None:
normalized_provider_config = provider_config if isinstance(provider_config, dict) else _deserialize_json(route.provider_config_json)
if _is_user_profile_route_config(normalized_provider_config):
return _resolve_user_profile_recipient(
db,
payload=payload,
route=route,
)
email = _clean_text(route.recipient_email)
if not email:
return None
recipient = {"email": email}
recipient_name = _clean_text(route.recipient_name)
if recipient_name:
recipient["name"] = recipient_name
return recipient
def _serialize_delivery(delivery: IntegrationDelivery) -> dict[str, Any]:
return {
"id": delivery.id,
"route_id": delivery.route_id,
"event_type": delivery.event_type,
"provider": delivery.provider,
"status": delivery.status,
"attempts": int(delivery.attempts or 0),
"payload": _deserialize_json(delivery.payload_json),
"recipient_email": delivery.recipient_email,
"recipient_name": delivery.recipient_name,
"rendered_subject": delivery.rendered_subject,
"rendered_body": delivery.rendered_body,
"provider_message_id": delivery.provider_message_id,
"last_error": delivery.last_error,
"idempotency_key": delivery.idempotency_key,
"dispatched_at": delivery.dispatched_at.isoformat() if delivery.dispatched_at else None,
"created_at": delivery.created_at.isoformat() if delivery.created_at else None,
"updated_at": delivery.updated_at.isoformat() if delivery.updated_at else None,
}
def _get_provider(provider_name: str):
factory = _PROVIDER_FACTORIES.get(_clean_text(provider_name) or "")
if not factory:
raise IntegrationProviderError(f"Provider de integracao nao suportado: {provider_name}")
return factory()
def list_integration_routes(
*,
event_type: str | None = None,
provider: str | None = None,
enabled: bool | None = None,
) -> list[dict[str, Any]]:
db = SessionMockLocal()
try:
query = db.query(IntegrationRoute)
normalized_event_type = _validate_event_type(event_type) if _clean_text(event_type) else None
if normalized_event_type:
query = query.filter(IntegrationRoute.event_type == normalized_event_type)
normalized_provider = _clean_text(provider)
if normalized_provider:
query = query.filter(IntegrationRoute.provider == normalized_provider)
if enabled is not None:
query = query.filter(IntegrationRoute.enabled.is_(bool(enabled)))
routes = query.order_by(IntegrationRoute.event_type.asc(), IntegrationRoute.id.asc()).all()
return [_serialize_route(route) for route in routes]
finally:
db.close()
def list_integration_deliveries(
*,
statuses: str | list[str] | tuple[str, ...] | None = None,
event_type: str | None = None,
provider: str | None = None,
route_id: int | None = None,
limit: int = 50,
) -> list[dict[str, Any]]:
normalized_statuses = _normalize_delivery_statuses(statuses)
normalized_event_type = _validate_event_type(event_type) if _clean_text(event_type) else None
normalized_provider = _clean_text(provider)
normalized_route_id = int(route_id) if route_id is not None else None
normalized_limit = max(1, int(limit)) if limit and int(limit) > 0 else None
db = SessionMockLocal()
try:
query = db.query(IntegrationDelivery)
if normalized_statuses:
query = query.filter(IntegrationDelivery.status.in_(normalized_statuses))
if normalized_event_type:
query = query.filter(IntegrationDelivery.event_type == normalized_event_type)
if normalized_provider:
query = query.filter(IntegrationDelivery.provider == normalized_provider)
if normalized_route_id is not None:
query = query.filter(IntegrationDelivery.route_id == normalized_route_id)
query = query.order_by(IntegrationDelivery.id.desc())
if normalized_limit is not None:
query = query.limit(normalized_limit)
rows = query.all()
return [_serialize_delivery(row) for row in rows]
finally:
db.close()
def upsert_email_integration_route(
*,
event_type: str,
recipient_email: str,
recipient_name: str | None = None,
subject_template: str | None = None,
body_template: str | None = None,
enabled: bool = True,
provider: str = "brevo_email",
provider_config: dict[str, Any] | None = None,
) -> dict[str, Any]:
normalized_event_type = _validate_event_type(event_type)
normalized_provider = _clean_text(provider) or "brevo_email"
normalized_provider_config = dict(provider_config or {})
normalized_email = _clean_text(recipient_email)
if not normalized_email and _is_user_profile_route_config(normalized_provider_config):
normalized_email = DYNAMIC_USER_ROUTE_EMAIL
if not normalized_email:
raise ValueError("recipient_email is required")
db = SessionMockLocal()
try:
route = (
db.query(IntegrationRoute)
.filter(IntegrationRoute.event_type == normalized_event_type)
.filter(IntegrationRoute.provider == normalized_provider)
.filter(IntegrationRoute.recipient_email == normalized_email)
.first()
)
if route is None:
route = IntegrationRoute(
event_type=normalized_event_type,
provider=normalized_provider,
recipient_email=normalized_email,
)
db.add(route)
route.enabled = bool(enabled)
route.recipient_name = _clean_text(recipient_name)
route.subject_template = _clean_text(subject_template)
route.body_template = _clean_text(body_template)
route.provider_config_json = _serialize_json(normalized_provider_config)
db.commit()
db.refresh(route)
return _serialize_route(route)
finally:
db.close()
def sync_user_email_integration_routes(
*,
user_id: int,
recipient_email: str,
recipient_name: str | None = None,
) -> list[dict[str, Any]]:
normalized_user_id = _normalize_int(user_id)
normalized_email = _clean_text(recipient_email)
if normalized_user_id is None:
raise ValueError("user_id is required")
if not normalized_email:
raise ValueError("recipient_email is required")
dynamic_provider_config = {
"recipient_scope": USER_PROFILE_ROUTE_SCOPE,
}
db = SessionMockLocal()
try:
routes = (
db.query(IntegrationRoute)
.filter(IntegrationRoute.provider == "brevo_email")
.filter(IntegrationRoute.event_type.in_(USER_PROFILE_EVENT_TYPES))
.all()
)
changed = False
for route in routes:
provider_config = _deserialize_json(route.provider_config_json)
if _is_legacy_user_profile_route_config(provider_config) and route.enabled:
route.enabled = False
changed = True
if changed:
db.commit()
finally:
db.close()
synced_routes: list[dict[str, Any]] = []
for event_type in USER_PROFILE_EVENT_TYPES:
synced_routes.append(
upsert_email_integration_route(
event_type=event_type,
recipient_email=DYNAMIC_USER_ROUTE_EMAIL,
recipient_name=DYNAMIC_USER_ROUTE_NAME,
enabled=True,
provider_config=dynamic_provider_config,
)
)
return synced_routes
async def emit_business_event(event_type: str, payload: dict[str, Any] | None) -> list[dict[str, Any]]:
if not settings.integrations_enabled:
return []
normalized_event_type = _validate_event_type(event_type)
normalized_payload = dict(payload or {})
created_ids: list[int] = []
db = SessionMockLocal()
try:
routes = (
db.query(IntegrationRoute)
.filter(IntegrationRoute.event_type == normalized_event_type)
.filter(IntegrationRoute.enabled.is_(True))
.all()
)
for route in routes:
route_provider_config = _deserialize_json(route.provider_config_json)
recipient = _resolve_route_recipient(
db,
route=route,
payload=normalized_payload,
provider_config=route_provider_config,
)
if recipient is None:
continue
idempotency_key = _build_idempotency_key(
route_id=route.id,
event_type=normalized_event_type,
payload=normalized_payload,
)
existing = (
db.query(IntegrationDelivery)
.filter(IntegrationDelivery.idempotency_key == idempotency_key)
.first()
)
if existing is not None:
created_ids.append(existing.id)
continue
delivery = IntegrationDelivery(
route_id=route.id,
event_type=normalized_event_type,
provider=route.provider,
status="pending",
payload_json=_serialize_json(normalized_payload),
recipient_email=recipient.get("email"),
recipient_name=recipient.get("name"),
idempotency_key=idempotency_key,
)
db.add(delivery)
db.flush()
created_ids.append(delivery.id)
db.commit()
finally:
db.close()
if settings.integration_sync_delivery_enabled and created_ids:
dispatched: list[dict[str, Any]] = []
for delivery_id in created_ids:
delivered = await dispatch_delivery(delivery_id)
if delivered is not None:
dispatched.append(delivered)
return dispatched
if not created_ids:
return []
db = SessionMockLocal()
try:
rows = (
db.query(IntegrationDelivery)
.filter(IntegrationDelivery.id.in_(created_ids))
.order_by(IntegrationDelivery.id.asc())
.all()
)
return [_serialize_delivery(row) for row in rows]
finally:
db.close()
async def dispatch_delivery(delivery_id: int) -> dict[str, Any] | None:
db = SessionMockLocal()
try:
delivery = db.query(IntegrationDelivery).filter(IntegrationDelivery.id == delivery_id).first()
if delivery is None:
return None
if delivery.status == "sent":
return _serialize_delivery(delivery)
route = db.query(IntegrationRoute).filter(IntegrationRoute.id == delivery.route_id).first()
if route is None or not route.enabled:
delivery.status = "skipped"
delivery.last_error = "Route disabled or not found."
delivery.attempts = int(delivery.attempts or 0) + 1
db.commit()
db.refresh(delivery)
return _serialize_delivery(delivery)
payload = _deserialize_json(delivery.payload_json)
route_provider_config = _deserialize_json(route.provider_config_json)
subject, body = render_email_content(
event_type=delivery.event_type,
payload=payload,
subject_template=route.subject_template,
body_template=route.body_template,
)
delivery.rendered_subject = subject
delivery.rendered_body = body
delivery.attempts = int(delivery.attempts or 0) + 1
recipient_email = _clean_text(delivery.recipient_email)
recipient_name = _clean_text(delivery.recipient_name)
if not recipient_email:
recipient = _resolve_route_recipient(
db,
route=route,
payload=payload,
provider_config=route_provider_config,
)
if recipient is None:
delivery.status = "skipped"
delivery.last_error = "Recipient email unavailable for delivery."
db.commit()
db.refresh(delivery)
return _serialize_delivery(delivery)
recipient_email = recipient.get("email")
recipient_name = recipient.get("name")
delivery.recipient_email = recipient_email
delivery.recipient_name = recipient_name
try:
provider = _get_provider(route.provider)
result = await provider.send_email(
to_email=recipient_email,
to_name=recipient_name,
subject=subject,
body=body,
tags=[delivery.event_type],
provider_config=route_provider_config,
)
except IntegrationProviderError as exc:
delivery.status = "failed"
delivery.last_error = str(exc)
db.commit()
db.refresh(delivery)
return _serialize_delivery(delivery)
delivery.status = "sent"
delivery.last_error = None
delivery.provider_message_id = _clean_text(result.get("message_id"))
delivery.dispatched_at = utc_now()
db.commit()
db.refresh(delivery)
return _serialize_delivery(delivery)
finally:
db.close()
async def process_pending_deliveries(
*,
limit: int = 20,
delivery_ids: list[int] | None = None,
statuses: str | list[str] | tuple[str, ...] | None = None,
event_type: str | None = None,
provider: str | None = None,
route_id: int | None = None,
) -> list[dict[str, Any]]:
normalized_statuses = _normalize_delivery_statuses(statuses)
normalized_event_type = _validate_event_type(event_type) if _clean_text(event_type) else None
normalized_provider = _clean_text(provider)
normalized_route_id = int(route_id) if route_id is not None else None
normalized_delivery_ids = sorted({int(delivery_id) for delivery_id in delivery_ids or []})
if not normalized_delivery_ids and normalized_statuses is None:
normalized_statuses = ["pending", "failed"]
db = SessionMockLocal()
try:
query = db.query(IntegrationDelivery)
if normalized_delivery_ids:
query = query.filter(IntegrationDelivery.id.in_(normalized_delivery_ids))
else:
if normalized_statuses:
query = query.filter(IntegrationDelivery.status.in_(normalized_statuses))
if normalized_event_type:
query = query.filter(IntegrationDelivery.event_type == normalized_event_type)
if normalized_provider:
query = query.filter(IntegrationDelivery.provider == normalized_provider)
if normalized_route_id is not None:
query = query.filter(IntegrationDelivery.route_id == normalized_route_id)
query = query.order_by(IntegrationDelivery.id.asc())
if limit and int(limit) > 0:
query = query.limit(max(1, int(limit)))
if normalized_delivery_ids:
query = query.order_by(IntegrationDelivery.id.asc())
selected_ids = [row.id for row in query.all()]
finally:
db.close()
results: list[dict[str, Any]] = []
for delivery_id in selected_ids:
dispatched = await dispatch_delivery(delivery_id)
if dispatched is not None:
results.append(dispatched)
return results
async def publish_business_event_safely(event_type: str, payload: dict[str, Any] | None) -> list[dict[str, Any]]:
try:
return await emit_business_event(event_type, payload)
except Exception:
logger.exception(
"Falha ao publicar evento de integracao.",
extra={"event_type": event_type},
)
return []