You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
orquestrador/app/services/integrations/service.py

328 lines
11 KiB
Python

import hashlib
import json
import logging
from typing import Any
from sqlalchemy import or_
from app.core.settings import settings
from app.core.time_utils import utc_now
from app.db.mock_database import SessionMockLocal
from app.db.mock_models import IntegrationDelivery, IntegrationRoute
from app.services.integrations.events import SUPPORTED_EVENT_TYPES
from app.services.integrations.providers import BrevoEmailProvider, IntegrationProviderError
from app.services.integrations.templates import render_email_content
logger = logging.getLogger(__name__)
_PROVIDER_FACTORIES = {
"brevo_email": BrevoEmailProvider,
}
def _clean_text(value: str | None) -> str | None:
text = str(value or "").strip()
return text or None
def _serialize_json(value: dict[str, Any] | None) -> str:
return json.dumps(value or {}, ensure_ascii=True, sort_keys=True, separators=(",", ":"), default=str)
def _deserialize_json(value: str | None) -> dict[str, Any]:
text = str(value or "").strip()
if not text:
return {}
try:
payload = json.loads(text)
except (TypeError, ValueError):
return {}
return payload if isinstance(payload, dict) else {}
def _validate_event_type(event_type: str) -> str:
normalized = _clean_text(event_type)
if normalized not in SUPPORTED_EVENT_TYPES:
raise ValueError(f"unsupported integration event: {event_type}")
return normalized
def _build_idempotency_key(*, route_id: int, event_type: str, payload: dict[str, Any]) -> str:
raw = f"{route_id}:{event_type}:{_serialize_json(payload)}"
return hashlib.sha256(raw.encode("utf-8")).hexdigest()
def _serialize_route(route: IntegrationRoute) -> dict[str, Any]:
return {
"id": route.id,
"event_type": route.event_type,
"provider": route.provider,
"enabled": bool(route.enabled),
"recipient_email": route.recipient_email,
"recipient_name": route.recipient_name,
"subject_template": route.subject_template,
"body_template": route.body_template,
"provider_config": _deserialize_json(route.provider_config_json),
}
def _serialize_delivery(delivery: IntegrationDelivery) -> dict[str, Any]:
return {
"id": delivery.id,
"route_id": delivery.route_id,
"event_type": delivery.event_type,
"provider": delivery.provider,
"status": delivery.status,
"attempts": int(delivery.attempts or 0),
"payload": _deserialize_json(delivery.payload_json),
"rendered_subject": delivery.rendered_subject,
"rendered_body": delivery.rendered_body,
"provider_message_id": delivery.provider_message_id,
"last_error": delivery.last_error,
"idempotency_key": delivery.idempotency_key,
"dispatched_at": delivery.dispatched_at.isoformat() if delivery.dispatched_at else None,
"created_at": delivery.created_at.isoformat() if delivery.created_at else None,
}
def _get_provider(provider_name: str):
factory = _PROVIDER_FACTORIES.get(_clean_text(provider_name) or "")
if not factory:
raise IntegrationProviderError(f"Provider de integracao nao suportado: {provider_name}")
return factory()
def list_integration_routes(*, event_type: str | None = None, provider: str | None = None) -> list[dict[str, Any]]:
db = SessionMockLocal()
try:
query = db.query(IntegrationRoute)
normalized_event_type = _clean_text(event_type)
if normalized_event_type:
query = query.filter(IntegrationRoute.event_type == normalized_event_type)
normalized_provider = _clean_text(provider)
if normalized_provider:
query = query.filter(IntegrationRoute.provider == normalized_provider)
routes = query.order_by(IntegrationRoute.event_type.asc(), IntegrationRoute.id.asc()).all()
return [_serialize_route(route) for route in routes]
finally:
db.close()
def upsert_email_integration_route(
*,
event_type: str,
recipient_email: str,
recipient_name: str | None = None,
subject_template: str | None = None,
body_template: str | None = None,
enabled: bool = True,
provider: str = "brevo_email",
provider_config: dict[str, Any] | None = None,
) -> dict[str, Any]:
normalized_event_type = _validate_event_type(event_type)
normalized_provider = _clean_text(provider) or "brevo_email"
normalized_email = _clean_text(recipient_email)
if not normalized_email:
raise ValueError("recipient_email is required")
db = SessionMockLocal()
try:
route = (
db.query(IntegrationRoute)
.filter(IntegrationRoute.event_type == normalized_event_type)
.filter(IntegrationRoute.provider == normalized_provider)
.filter(IntegrationRoute.recipient_email == normalized_email)
.first()
)
if route is None:
route = IntegrationRoute(
event_type=normalized_event_type,
provider=normalized_provider,
recipient_email=normalized_email,
)
db.add(route)
route.enabled = bool(enabled)
route.recipient_name = _clean_text(recipient_name)
route.subject_template = _clean_text(subject_template)
route.body_template = _clean_text(body_template)
route.provider_config_json = _serialize_json(provider_config)
db.commit()
db.refresh(route)
return _serialize_route(route)
finally:
db.close()
async def emit_business_event(event_type: str, payload: dict[str, Any] | None) -> list[dict[str, Any]]:
if not settings.integrations_enabled:
return []
normalized_event_type = _validate_event_type(event_type)
normalized_payload = dict(payload or {})
created_ids: list[int] = []
db = SessionMockLocal()
try:
routes = (
db.query(IntegrationRoute)
.filter(IntegrationRoute.event_type == normalized_event_type)
.filter(IntegrationRoute.enabled.is_(True))
.all()
)
for route in routes:
idempotency_key = _build_idempotency_key(
route_id=route.id,
event_type=normalized_event_type,
payload=normalized_payload,
)
existing = (
db.query(IntegrationDelivery)
.filter(IntegrationDelivery.idempotency_key == idempotency_key)
.first()
)
if existing is not None:
created_ids.append(existing.id)
continue
delivery = IntegrationDelivery(
route_id=route.id,
event_type=normalized_event_type,
provider=route.provider,
status="pending",
payload_json=_serialize_json(normalized_payload),
idempotency_key=idempotency_key,
)
db.add(delivery)
db.flush()
created_ids.append(delivery.id)
db.commit()
finally:
db.close()
if settings.integration_sync_delivery_enabled and created_ids:
dispatched: list[dict[str, Any]] = []
for delivery_id in created_ids:
delivered = await dispatch_delivery(delivery_id)
if delivered is not None:
dispatched.append(delivered)
return dispatched
if not created_ids:
return []
db = SessionMockLocal()
try:
rows = (
db.query(IntegrationDelivery)
.filter(IntegrationDelivery.id.in_(created_ids))
.order_by(IntegrationDelivery.id.asc())
.all()
)
return [_serialize_delivery(row) for row in rows]
finally:
db.close()
async def dispatch_delivery(delivery_id: int) -> dict[str, Any] | None:
db = SessionMockLocal()
try:
delivery = db.query(IntegrationDelivery).filter(IntegrationDelivery.id == delivery_id).first()
if delivery is None:
return None
if delivery.status == "sent":
return _serialize_delivery(delivery)
route = db.query(IntegrationRoute).filter(IntegrationRoute.id == delivery.route_id).first()
if route is None or not route.enabled:
delivery.status = "skipped"
delivery.last_error = "Route disabled or not found."
delivery.attempts = int(delivery.attempts or 0) + 1
db.commit()
db.refresh(delivery)
return _serialize_delivery(delivery)
payload = _deserialize_json(delivery.payload_json)
subject, body = render_email_content(
event_type=delivery.event_type,
payload=payload,
subject_template=route.subject_template,
body_template=route.body_template,
)
delivery.rendered_subject = subject
delivery.rendered_body = body
delivery.attempts = int(delivery.attempts or 0) + 1
try:
provider = _get_provider(route.provider)
result = await provider.send_email(
to_email=route.recipient_email,
to_name=route.recipient_name,
subject=subject,
body=body,
tags=[delivery.event_type],
)
except IntegrationProviderError as exc:
delivery.status = "failed"
delivery.last_error = str(exc)
db.commit()
db.refresh(delivery)
return _serialize_delivery(delivery)
delivery.status = "sent"
delivery.last_error = None
delivery.provider_message_id = _clean_text(result.get("message_id"))
delivery.dispatched_at = utc_now()
db.commit()
db.refresh(delivery)
return _serialize_delivery(delivery)
finally:
db.close()
async def process_pending_deliveries(
*,
limit: int = 20,
delivery_ids: list[int] | None = None,
) -> list[dict[str, Any]]:
db = SessionMockLocal()
try:
query = db.query(IntegrationDelivery)
if delivery_ids:
query = query.filter(IntegrationDelivery.id.in_(delivery_ids))
else:
query = query.filter(
or_(
IntegrationDelivery.status == "pending",
IntegrationDelivery.status == "failed",
)
)
query = query.order_by(IntegrationDelivery.id.asc())
if limit and int(limit) > 0:
query = query.limit(max(1, int(limit)))
if delivery_ids:
query = query.order_by(IntegrationDelivery.id.asc())
selected_ids = [row.id for row in query.all()]
finally:
db.close()
results: list[dict[str, Any]] = []
for delivery_id in selected_ids:
dispatched = await dispatch_delivery(delivery_id)
if dispatched is not None:
results.append(dispatched)
return results
async def publish_business_event_safely(event_type: str, payload: dict[str, Any] | None) -> list[dict[str, Any]]:
try:
return await emit_business_event(event_type, payload)
except Exception:
logger.exception(
"Falha ao publicar evento de integracao.",
extra={"event_type": event_type},
)
return []