You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
orquestrador/tests/test_integration_service.py

481 lines
18 KiB
Python

import os
import unittest
from unittest.mock import AsyncMock, patch
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import StaticPool
os.environ.setdefault("DEBUG", "false")
from app.db.mock_database import MockBase
from app.db.mock_models import IntegrationDelivery, IntegrationRoute, User
from app.services.integrations.events import ORDER_CREATED_EVENT, REVIEW_SCHEDULED_EVENT
from app.services.integrations import service as integration_service
class IntegrationServiceTests(unittest.IsolatedAsyncioTestCase):
def _build_session_local(self):
engine = create_engine(
"sqlite://",
connect_args={"check_same_thread": False},
poolclass=StaticPool,
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
MockBase.metadata.create_all(bind=engine)
self.addCleanup(engine.dispose)
return SessionLocal
def _create_user(
self,
SessionLocal,
*,
user_id: int,
email: str | None,
name: str = "Cliente Teste",
) -> None:
db = SessionLocal()
try:
db.add(
User(
id=user_id,
channel="telegram",
external_id=f"tg-{user_id}",
name=name,
email=email,
)
)
db.commit()
finally:
db.close()
async def test_emit_business_event_creates_and_dispatches_delivery(self):
SessionLocal = self._build_session_local()
with patch.object(integration_service, "SessionMockLocal", SessionLocal), patch.object(
integration_service.settings,
"integrations_enabled",
True,
), patch.object(
integration_service.settings,
"integration_sync_delivery_enabled",
True,
), patch.object(
integration_service,
"_get_provider",
return_value=type(
"FakeProvider",
(),
{
"send_email": AsyncMock(return_value={"message_id": "brevo-123"}),
},
)(),
):
integration_service.upsert_email_integration_route(
event_type=ORDER_CREATED_EVENT,
recipient_email="ops@example.com",
)
deliveries = await integration_service.emit_business_event(
ORDER_CREATED_EVENT,
{
"numero_pedido": "PED-1",
"modelo_veiculo": "Fiat Argo 2024",
"valor_veiculo": 67739.0,
"status": "Ativo",
"status_veiculo": "Reservado",
},
)
self.assertEqual(len(deliveries), 1)
self.assertEqual(deliveries[0]["status"], "sent")
self.assertEqual(deliveries[0]["provider_message_id"], "brevo-123")
self.assertEqual(deliveries[0]["recipient_email"], "ops@example.com")
db = SessionLocal()
try:
stored = db.query(IntegrationDelivery).one()
self.assertEqual(stored.status, "sent")
self.assertEqual(stored.provider_message_id, "brevo-123")
self.assertEqual(stored.recipient_email, "ops@example.com")
finally:
db.close()
async def test_emit_business_event_passes_route_provider_config_to_provider(self):
SessionLocal = self._build_session_local()
fake_provider = type(
"FakeProvider",
(),
{
"send_email": AsyncMock(return_value={"message_id": "brevo-config"}),
},
)()
provider_config = {
"sender": {"email": "noreply@example.com", "name": "Operacoes"},
"reply_to": {"email": "atendimento@example.com", "name": "Atendimento"},
"tags": ["ops"],
}
with patch.object(integration_service, "SessionMockLocal", SessionLocal), patch.object(
integration_service.settings,
"integrations_enabled",
True,
), patch.object(
integration_service.settings,
"integration_sync_delivery_enabled",
True,
), patch.object(
integration_service,
"_get_provider",
return_value=fake_provider,
):
integration_service.upsert_email_integration_route(
event_type=ORDER_CREATED_EVENT,
recipient_email="ops@example.com",
provider_config=provider_config,
)
await integration_service.emit_business_event(
ORDER_CREATED_EVENT,
{
"numero_pedido": "PED-1",
},
)
fake_provider.send_email.assert_awaited_once()
kwargs = fake_provider.send_email.await_args.kwargs
self.assertEqual(kwargs["provider_config"], provider_config)
self.assertEqual(kwargs["tags"], [ORDER_CREATED_EVENT])
async def test_emit_business_event_deduplicates_by_route_and_payload(self):
SessionLocal = self._build_session_local()
with patch.object(integration_service, "SessionMockLocal", SessionLocal), patch.object(
integration_service.settings,
"integrations_enabled",
True,
), patch.object(
integration_service.settings,
"integration_sync_delivery_enabled",
False,
):
integration_service.upsert_email_integration_route(
event_type=ORDER_CREATED_EVENT,
recipient_email="ops@example.com",
)
await integration_service.emit_business_event(ORDER_CREATED_EVENT, {"numero_pedido": "PED-1"})
await integration_service.emit_business_event(ORDER_CREATED_EVENT, {"numero_pedido": "PED-1"})
db = SessionLocal()
try:
rows = db.query(IntegrationDelivery).all()
self.assertEqual(len(rows), 1)
self.assertEqual(rows[0].status, "pending")
finally:
db.close()
async def test_emit_business_event_skips_dynamic_user_route_without_saved_email(self):
SessionLocal = self._build_session_local()
self._create_user(SessionLocal, user_id=7, email=None)
with patch.object(integration_service, "SessionMockLocal", SessionLocal), patch.object(
integration_service.settings,
"integrations_enabled",
True,
), patch.object(
integration_service.settings,
"integration_sync_delivery_enabled",
False,
):
integration_service.sync_user_email_integration_routes(
user_id=7,
recipient_email="cliente@example.com",
recipient_name="Cliente Teste",
)
deliveries = await integration_service.emit_business_event(
ORDER_CREATED_EVENT,
{"numero_pedido": "PED-SEM-EMAIL", "user_id": 7},
)
self.assertEqual(deliveries, [])
db = SessionLocal()
try:
self.assertEqual(db.query(IntegrationDelivery).count(), 0)
finally:
db.close()
async def test_emit_business_event_resolves_dynamic_user_recipient_from_user_profile(self):
SessionLocal = self._build_session_local()
self._create_user(SessionLocal, user_id=7, email="cliente@example.com", name="Cliente Teste")
fake_provider = type(
"FakeProvider",
(),
{
"send_email": AsyncMock(return_value={"message_id": "brevo-dynamic"}),
},
)()
with patch.object(integration_service, "SessionMockLocal", SessionLocal), patch.object(
integration_service.settings,
"integrations_enabled",
True,
), patch.object(
integration_service.settings,
"integration_sync_delivery_enabled",
True,
), patch.object(
integration_service,
"_get_provider",
return_value=fake_provider,
):
integration_service.sync_user_email_integration_routes(
user_id=7,
recipient_email="cliente@example.com",
recipient_name="Cliente Teste",
)
deliveries = await integration_service.emit_business_event(
ORDER_CREATED_EVENT,
{"numero_pedido": "PED-DINAMICO", "user_id": 7},
)
self.assertEqual(len(deliveries), 1)
self.assertEqual(deliveries[0]["status"], "sent")
self.assertEqual(deliveries[0]["recipient_email"], "cliente@example.com")
fake_provider.send_email.assert_awaited_once()
kwargs = fake_provider.send_email.await_args.kwargs
self.assertEqual(kwargs["to_email"], "cliente@example.com")
self.assertEqual(kwargs["to_name"], "Cliente Teste")
db = SessionLocal()
try:
stored = db.query(IntegrationDelivery).one()
self.assertEqual(stored.recipient_email, "cliente@example.com")
self.assertEqual(stored.recipient_name, "Cliente Teste")
finally:
db.close()
async def test_sync_user_email_integration_routes_creates_one_global_dynamic_route_per_event(self):
SessionLocal = self._build_session_local()
with patch.object(integration_service, "SessionMockLocal", SessionLocal):
integration_service.sync_user_email_integration_routes(
user_id=7,
recipient_email="primeiro@example.com",
recipient_name="Cliente Teste",
)
integration_service.sync_user_email_integration_routes(
user_id=9,
recipient_email="segundo@example.com",
recipient_name="Outro Cliente",
)
db = SessionLocal()
try:
routes = (
db.query(IntegrationRoute)
.filter(IntegrationRoute.event_type == ORDER_CREATED_EVENT)
.order_by(IntegrationRoute.id.asc())
.all()
)
self.assertEqual(len(routes), 1)
self.assertEqual(routes[0].recipient_email, integration_service.DYNAMIC_USER_ROUTE_EMAIL)
self.assertTrue(routes[0].enabled)
self.assertEqual(
routes[0].provider_config_json,
integration_service._serialize_json({"recipient_scope": integration_service.USER_PROFILE_ROUTE_SCOPE}),
)
finally:
db.close()
async def test_sync_user_email_integration_routes_disables_legacy_user_specific_routes(self):
SessionLocal = self._build_session_local()
with patch.object(integration_service, "SessionMockLocal", SessionLocal):
integration_service.upsert_email_integration_route(
event_type=ORDER_CREATED_EVENT,
recipient_email="legacy@example.com",
enabled=True,
provider_config={
"recipient_scope": integration_service.USER_PROFILE_ROUTE_SCOPE,
"user_id": 7,
},
)
integration_service.sync_user_email_integration_routes(
user_id=7,
recipient_email="cliente@example.com",
recipient_name="Cliente Teste",
)
db = SessionLocal()
try:
routes = (
db.query(IntegrationRoute)
.filter(IntegrationRoute.event_type == ORDER_CREATED_EVENT)
.order_by(IntegrationRoute.id.asc())
.all()
)
self.assertEqual(len(routes), 2)
self.assertFalse(routes[0].enabled)
self.assertTrue(routes[1].enabled)
self.assertEqual(routes[1].recipient_email, integration_service.DYNAMIC_USER_ROUTE_EMAIL)
finally:
db.close()
async def test_process_pending_deliveries_marks_failure_when_provider_fails(self):
SessionLocal = self._build_session_local()
with patch.object(integration_service, "SessionMockLocal", SessionLocal), patch.object(
integration_service.settings,
"integrations_enabled",
True,
), patch.object(
integration_service.settings,
"integration_sync_delivery_enabled",
False,
):
integration_service.upsert_email_integration_route(
event_type=ORDER_CREATED_EVENT,
recipient_email="ops@example.com",
)
await integration_service.emit_business_event(ORDER_CREATED_EVENT, {"numero_pedido": "PED-1"})
with patch.object(integration_service, "SessionMockLocal", SessionLocal), patch.object(
integration_service,
"_get_provider",
side_effect=integration_service.IntegrationProviderError("brevo offline"),
):
deliveries = await integration_service.process_pending_deliveries(limit=10)
self.assertEqual(len(deliveries), 1)
self.assertEqual(deliveries[0]["status"], "failed")
self.assertIn("brevo offline", deliveries[0]["last_error"])
async def test_list_integration_deliveries_filters_by_status_and_event(self):
SessionLocal = self._build_session_local()
with patch.object(integration_service, "SessionMockLocal", SessionLocal), patch.object(
integration_service.settings,
"integrations_enabled",
True,
), patch.object(
integration_service.settings,
"integration_sync_delivery_enabled",
False,
):
integration_service.upsert_email_integration_route(
event_type=ORDER_CREATED_EVENT,
recipient_email="ops@example.com",
)
integration_service.upsert_email_integration_route(
event_type=REVIEW_SCHEDULED_EVENT,
recipient_email="review@example.com",
)
await integration_service.emit_business_event(ORDER_CREATED_EVENT, {"numero_pedido": "PED-1"})
await integration_service.emit_business_event(REVIEW_SCHEDULED_EVENT, {"protocolo": "REV-1"})
db = SessionLocal()
try:
order_delivery = (
db.query(IntegrationDelivery)
.filter(IntegrationDelivery.event_type == ORDER_CREATED_EVENT)
.one()
)
order_delivery.status = "failed"
db.commit()
finally:
db.close()
with patch.object(integration_service, "SessionMockLocal", SessionLocal):
deliveries = integration_service.list_integration_deliveries(
statuses=["failed"],
event_type=ORDER_CREATED_EVENT,
limit=10,
)
self.assertEqual(len(deliveries), 1)
self.assertEqual(deliveries[0]["event_type"], ORDER_CREATED_EVENT)
self.assertEqual(deliveries[0]["status"], "failed")
self.assertEqual(deliveries[0]["recipient_email"], "ops@example.com")
async def test_process_pending_deliveries_respects_status_and_event_filters(self):
SessionLocal = self._build_session_local()
with patch.object(integration_service, "SessionMockLocal", SessionLocal), patch.object(
integration_service.settings,
"integrations_enabled",
True,
), patch.object(
integration_service.settings,
"integration_sync_delivery_enabled",
False,
):
integration_service.upsert_email_integration_route(
event_type=ORDER_CREATED_EVENT,
recipient_email="ops@example.com",
)
integration_service.upsert_email_integration_route(
event_type=REVIEW_SCHEDULED_EVENT,
recipient_email="review@example.com",
)
await integration_service.emit_business_event(ORDER_CREATED_EVENT, {"numero_pedido": "PED-1"})
await integration_service.emit_business_event(REVIEW_SCHEDULED_EVENT, {"protocolo": "REV-1"})
fake_provider = type(
"FakeProvider",
(),
{
"send_email": AsyncMock(return_value={"message_id": "brevo-filtered"}),
},
)()
with patch.object(integration_service, "SessionMockLocal", SessionLocal), patch.object(
integration_service,
"_get_provider",
return_value=fake_provider,
):
deliveries = await integration_service.process_pending_deliveries(
statuses=["pending"],
event_type=REVIEW_SCHEDULED_EVENT,
limit=10,
)
self.assertEqual(len(deliveries), 1)
self.assertEqual(deliveries[0]["event_type"], REVIEW_SCHEDULED_EVENT)
self.assertEqual(deliveries[0]["status"], "sent")
db = SessionLocal()
try:
rows = (
db.query(IntegrationDelivery)
.order_by(IntegrationDelivery.event_type.asc(), IntegrationDelivery.id.asc())
.all()
)
self.assertEqual(rows[0].status, "pending")
self.assertEqual(rows[1].status, "sent")
finally:
db.close()
def test_list_integration_routes_filters_disabled_routes(self):
SessionLocal = self._build_session_local()
with patch.object(integration_service, "SessionMockLocal", SessionLocal):
integration_service.upsert_email_integration_route(
event_type=ORDER_CREATED_EVENT,
recipient_email="ops@example.com",
enabled=True,
)
integration_service.upsert_email_integration_route(
event_type=REVIEW_SCHEDULED_EVENT,
recipient_email="review@example.com",
enabled=False,
)
routes = integration_service.list_integration_routes(enabled=False)
self.assertEqual(len(routes), 1)
self.assertEqual(routes[0]["event_type"], REVIEW_SCHEDULED_EVENT)
self.assertFalse(routes[0]["enabled"])
if __name__ == "__main__":
unittest.main()