You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
468 lines
18 KiB
Python
468 lines
18 KiB
Python
import unittest
|
|
import asyncio
|
|
from types import SimpleNamespace
|
|
from unittest.mock import AsyncMock, patch
|
|
|
|
import aiohttp
|
|
from fastapi import HTTPException
|
|
|
|
from app.integrations.telegram_satellite_service import (
|
|
TELEGRAM_RUNTIME_BUCKET,
|
|
TELEGRAM_RUNTIME_OWNER_ID,
|
|
TelegramSatelliteService,
|
|
)
|
|
from app.services.orchestration.conversation_state_store import ConversationStateStore
|
|
|
|
|
|
class _DummySession:
|
|
def close(self):
|
|
return None
|
|
|
|
|
|
class _FakeTelegramResponse:
|
|
def __init__(self, payload):
|
|
self.payload = payload
|
|
|
|
async def __aenter__(self):
|
|
return self
|
|
|
|
async def __aexit__(self, exc_type, exc, tb):
|
|
return None
|
|
|
|
async def json(self):
|
|
return self.payload
|
|
|
|
|
|
class _FakeTelegramSession:
|
|
def __init__(self, payload):
|
|
self.payload = payload
|
|
self.calls = []
|
|
|
|
def post(self, url, json):
|
|
self.calls.append((url, json))
|
|
return _FakeTelegramResponse(self.payload)
|
|
|
|
|
|
class _FlakyTelegramResponse:
|
|
def __init__(self, outcome):
|
|
self.outcome = outcome
|
|
|
|
async def __aenter__(self):
|
|
if isinstance(self.outcome, BaseException):
|
|
raise self.outcome
|
|
return self
|
|
|
|
async def __aexit__(self, exc_type, exc, tb):
|
|
return None
|
|
|
|
async def json(self):
|
|
return self.outcome
|
|
|
|
|
|
class _FlakyTelegramSession:
|
|
def __init__(self, outcomes):
|
|
self.outcomes = list(outcomes)
|
|
self.calls = []
|
|
|
|
def post(self, url, json):
|
|
self.calls.append((url, json))
|
|
if self.outcomes:
|
|
outcome = self.outcomes.pop(0)
|
|
else:
|
|
outcome = {"ok": True}
|
|
return _FlakyTelegramResponse(outcome)
|
|
|
|
|
|
class TelegramMultimodalTests(unittest.IsolatedAsyncioTestCase):
|
|
def _build_service(self) -> TelegramSatelliteService:
|
|
service = TelegramSatelliteService(
|
|
"token-teste",
|
|
state_repository=ConversationStateStore(),
|
|
)
|
|
self._service_under_test = service
|
|
return service
|
|
|
|
async def asyncTearDown(self):
|
|
service = getattr(self, "_service_under_test", None)
|
|
if service is not None:
|
|
await service._shutdown_chat_workers()
|
|
|
|
async def test_process_message_uses_extracted_image_message(self):
|
|
service = self._build_service()
|
|
tools_db = _DummySession()
|
|
mock_db = _DummySession()
|
|
|
|
with patch("app.integrations.telegram_satellite_service.SessionLocal", return_value=tools_db), patch(
|
|
"app.integrations.telegram_satellite_service.SessionMockLocal",
|
|
return_value=mock_db,
|
|
), patch("app.integrations.telegram_satellite_service.UserService") as user_service_cls, patch(
|
|
"app.integrations.telegram_satellite_service.OrquestradorService"
|
|
) as orchestrator_cls, patch.object(
|
|
service,
|
|
"_build_orchestration_message_from_image",
|
|
AsyncMock(return_value="[imagem recebida no telegram]\nDados extraidos da imagem: Registrar pagamento de aluguel: placa ABC1D23; valor 293,47; data_pagamento 17/03/2026 14:30; identificador_comprovante NSU123."),
|
|
):
|
|
user_service_cls.return_value.get_or_create.return_value = SimpleNamespace(id=7)
|
|
orchestrator_cls.return_value.handle_message = AsyncMock(return_value="ok")
|
|
|
|
answer = await service._process_message(
|
|
text="segue o comprovante",
|
|
sender={"id": 99, "first_name": "Vitor"},
|
|
chat_id=99,
|
|
image_attachments=[{"mime_type": "image/jpeg", "data": b"123"}],
|
|
)
|
|
|
|
self.assertEqual(answer, "ok")
|
|
orchestrator_cls.return_value.handle_message.assert_awaited_once()
|
|
kwargs = orchestrator_cls.return_value.handle_message.await_args.kwargs
|
|
self.assertIn("Registrar pagamento de aluguel", kwargs["message"])
|
|
self.assertEqual(kwargs["user_id"], 7)
|
|
|
|
async def test_process_message_returns_direct_failure_for_unreadable_image(self):
|
|
service = self._build_service()
|
|
tools_db = _DummySession()
|
|
mock_db = _DummySession()
|
|
|
|
with patch("app.integrations.telegram_satellite_service.SessionLocal", return_value=tools_db), patch(
|
|
"app.integrations.telegram_satellite_service.SessionMockLocal",
|
|
return_value=mock_db,
|
|
), patch("app.integrations.telegram_satellite_service.UserService") as user_service_cls, patch(
|
|
"app.integrations.telegram_satellite_service.OrquestradorService"
|
|
) as orchestrator_cls, patch.object(
|
|
service,
|
|
"_build_orchestration_message_from_image",
|
|
AsyncMock(return_value="Nao consegui identificar os dados da imagem. Descreva o documento ou envie uma foto mais nitida."),
|
|
):
|
|
user_service_cls.return_value.get_or_create.return_value = SimpleNamespace(id=7)
|
|
orchestrator_cls.return_value.handle_message = AsyncMock()
|
|
|
|
answer = await service._process_message(
|
|
text="",
|
|
sender={"id": 99},
|
|
chat_id=99,
|
|
image_attachments=[{"mime_type": "image/jpeg", "data": b"123"}],
|
|
)
|
|
|
|
self.assertIn("Nao consegui identificar os dados da imagem", answer)
|
|
self.assertFalse(orchestrator_cls.return_value.handle_message.await_count)
|
|
|
|
async def test_process_message_returns_direct_failure_for_receipt_without_watermark(self):
|
|
service = self._build_service()
|
|
tools_db = _DummySession()
|
|
mock_db = _DummySession()
|
|
|
|
with patch("app.integrations.telegram_satellite_service.SessionLocal", return_value=tools_db), patch(
|
|
"app.integrations.telegram_satellite_service.SessionMockLocal",
|
|
return_value=mock_db,
|
|
), patch("app.integrations.telegram_satellite_service.UserService") as user_service_cls, patch(
|
|
"app.integrations.telegram_satellite_service.OrquestradorService"
|
|
) as orchestrator_cls, patch.object(
|
|
service,
|
|
"_build_orchestration_message_from_image",
|
|
AsyncMock(return_value="O comprovante enviado nao e valido. Envie um comprovante valido com a marca d'agua SysaltiIA visivel."),
|
|
):
|
|
user_service_cls.return_value.get_or_create.return_value = SimpleNamespace(id=7)
|
|
orchestrator_cls.return_value.handle_message = AsyncMock()
|
|
|
|
answer = await service._process_message(
|
|
text="segue o comprovante",
|
|
sender={"id": 99},
|
|
chat_id=99,
|
|
image_attachments=[{"mime_type": "image/jpeg", "data": b"123"}],
|
|
)
|
|
|
|
self.assertIn("marca d'agua SysaltiIA visivel", answer)
|
|
self.assertFalse(orchestrator_cls.return_value.handle_message.await_count)
|
|
|
|
async def test_process_message_offloads_blocking_turn_to_worker_thread(self):
|
|
service = self._build_service()
|
|
|
|
with patch(
|
|
"app.integrations.telegram_satellite_service.asyncio.to_thread",
|
|
AsyncMock(return_value="ok"),
|
|
) as to_thread:
|
|
answer = await service._process_message(
|
|
text="quero ver a frota",
|
|
sender={"id": 99, "first_name": "Vitor"},
|
|
chat_id=99,
|
|
image_attachments=[],
|
|
)
|
|
|
|
self.assertEqual(answer, "ok")
|
|
self.assertEqual(to_thread.await_count, 1)
|
|
self.assertEqual(to_thread.await_args.kwargs["message_text"], "quero ver a frota")
|
|
self.assertEqual(to_thread.await_args.kwargs["chat_id"], 99)
|
|
self.assertEqual(to_thread.await_args.kwargs["sender"]["id"], 99)
|
|
|
|
async def test_handle_update_masks_sensitive_domain_error_in_logs(self):
|
|
service = self._build_service()
|
|
update = {
|
|
"update_id": 1,
|
|
"message": {
|
|
"chat": {"id": 99},
|
|
"from": {"id": 99},
|
|
"text": "segue o pagamento",
|
|
},
|
|
}
|
|
|
|
with patch.object(service, "_extract_image_attachments", AsyncMock(return_value=[])), patch.object(
|
|
service,
|
|
"_process_message",
|
|
AsyncMock(
|
|
side_effect=HTTPException(
|
|
status_code=409,
|
|
detail={
|
|
"cpf": "12345678909",
|
|
"placa": "ABC1D23",
|
|
"external_id": "987654321",
|
|
"identificador_comprovante": "NSU123",
|
|
},
|
|
)
|
|
),
|
|
), patch.object(service, "_send_message", AsyncMock()), patch(
|
|
"app.integrations.telegram_satellite_service.logger.warning"
|
|
) as logger_warning:
|
|
await service._handle_update(session=SimpleNamespace(), update=update)
|
|
|
|
self.assertTrue(logger_warning.called)
|
|
logged_detail = str(logger_warning.call_args.args[1])
|
|
self.assertNotIn("12345678909", logged_detail)
|
|
self.assertNotIn("ABC1D23", logged_detail)
|
|
self.assertNotIn("987654321", logged_detail)
|
|
self.assertNotIn("NSU123", logged_detail)
|
|
self.assertIn("***.***.***-09", logged_detail)
|
|
self.assertIn("ABC***3", logged_detail)
|
|
self.assertIn("******321", logged_detail)
|
|
self.assertIn("***123", logged_detail)
|
|
|
|
async def test_handle_update_reuses_cached_answer_for_duplicate_message(self):
|
|
service = self._build_service()
|
|
update = {
|
|
"update_id": 10,
|
|
"message": {
|
|
"message_id": 77,
|
|
"chat": {"id": 99},
|
|
"from": {"id": 99},
|
|
"text": "quero ver a frota",
|
|
},
|
|
}
|
|
|
|
with patch.object(service, "_extract_image_attachments", AsyncMock(return_value=[])), patch.object(
|
|
service,
|
|
"_process_message",
|
|
AsyncMock(return_value="Segue a frota disponivel."),
|
|
) as process_message, patch.object(service, "_send_message", AsyncMock()) as send_message:
|
|
await service._handle_update(session=SimpleNamespace(), update=update)
|
|
await service._handle_update(session=SimpleNamespace(), update=update)
|
|
|
|
self.assertEqual(process_message.await_count, 1)
|
|
self.assertEqual(send_message.await_count, 2)
|
|
first_text = send_message.await_args_list[0].kwargs["text"]
|
|
second_text = send_message.await_args_list[1].kwargs["text"]
|
|
self.assertEqual(first_text, "Segue a frota disponivel.")
|
|
self.assertEqual(second_text, "Segue a frota disponivel.")
|
|
|
|
async def test_handle_update_processes_same_text_again_when_message_id_changes(self):
|
|
service = self._build_service()
|
|
first_update = {
|
|
"update_id": 10,
|
|
"message": {
|
|
"message_id": 77,
|
|
"chat": {"id": 99},
|
|
"from": {"id": 99},
|
|
"text": "quero ver a frota",
|
|
},
|
|
}
|
|
second_update = {
|
|
"update_id": 11,
|
|
"message": {
|
|
"message_id": 78,
|
|
"chat": {"id": 99},
|
|
"from": {"id": 99},
|
|
"text": "quero ver a frota",
|
|
},
|
|
}
|
|
|
|
with patch.object(service, "_extract_image_attachments", AsyncMock(return_value=[])), patch.object(
|
|
service,
|
|
"_process_message",
|
|
AsyncMock(side_effect=["Resposta 1", "Resposta 2"]),
|
|
) as process_message, patch.object(service, "_send_message", AsyncMock()) as send_message:
|
|
await service._handle_update(session=SimpleNamespace(), update=first_update)
|
|
await service._handle_update(session=SimpleNamespace(), update=second_update)
|
|
|
|
self.assertEqual(process_message.await_count, 2)
|
|
self.assertEqual(send_message.await_count, 2)
|
|
self.assertEqual(send_message.await_args_list[0].kwargs["text"], "Resposta 1")
|
|
self.assertEqual(send_message.await_args_list[1].kwargs["text"], "Resposta 2")
|
|
|
|
async def test_initialize_offset_uses_persisted_cursor(self):
|
|
service = self._build_service()
|
|
service.state.set_entry(
|
|
TELEGRAM_RUNTIME_BUCKET,
|
|
TELEGRAM_RUNTIME_OWNER_ID,
|
|
{"last_update_id": 41},
|
|
)
|
|
|
|
offset = await service._initialize_offset(session=SimpleNamespace())
|
|
|
|
self.assertEqual(offset, 42)
|
|
self.assertEqual(service._last_update_id, 41)
|
|
|
|
async def test_initialize_offset_bootstraps_cursor_once_when_missing(self):
|
|
service = self._build_service()
|
|
session = _FakeTelegramSession(
|
|
{
|
|
"ok": True,
|
|
"result": [
|
|
{"update_id": 5},
|
|
{"update_id": 6},
|
|
],
|
|
}
|
|
)
|
|
|
|
offset = await service._initialize_offset(session=session)
|
|
|
|
self.assertEqual(offset, 7)
|
|
self.assertEqual(service._last_update_id, 6)
|
|
entry = service.state.get_entry(TELEGRAM_RUNTIME_BUCKET, TELEGRAM_RUNTIME_OWNER_ID)
|
|
self.assertEqual(entry["last_update_id"], 6)
|
|
self.assertEqual(len(session.calls), 1)
|
|
|
|
async def test_handle_update_persists_runtime_cursor(self):
|
|
service = self._build_service()
|
|
update = {
|
|
"update_id": 14,
|
|
"message": {
|
|
"message_id": 88,
|
|
"chat": {"id": 99},
|
|
"from": {"id": 99},
|
|
"text": "status do pedido",
|
|
},
|
|
}
|
|
|
|
with patch.object(service, "_extract_image_attachments", AsyncMock(return_value=[])), patch.object(
|
|
service,
|
|
"_process_message",
|
|
AsyncMock(return_value="Pedido encontrado."),
|
|
), patch.object(service, "_send_message", AsyncMock()):
|
|
await service._handle_update(session=SimpleNamespace(), update=update)
|
|
|
|
entry = service.state.get_entry(TELEGRAM_RUNTIME_BUCKET, TELEGRAM_RUNTIME_OWNER_ID)
|
|
self.assertEqual(entry["last_update_id"], 14)
|
|
|
|
async def test_send_message_retries_transient_transport_failure(self):
|
|
service = self._build_service()
|
|
session = _FlakyTelegramSession(
|
|
[
|
|
asyncio.TimeoutError(),
|
|
aiohttp.ClientConnectionError("falha temporaria"),
|
|
{"ok": True},
|
|
]
|
|
)
|
|
|
|
with patch("app.integrations.telegram_satellite_service.asyncio.sleep", AsyncMock()) as sleep_mock:
|
|
await service._send_message(session=session, chat_id=99, text="resposta teste")
|
|
|
|
self.assertEqual(len(session.calls), 3)
|
|
self.assertEqual(sleep_mock.await_count, 2)
|
|
|
|
async def test_handle_update_swallows_unexpected_delivery_failure(self):
|
|
service = self._build_service()
|
|
update = {
|
|
"update_id": 15,
|
|
"message": {
|
|
"message_id": 89,
|
|
"chat": {"id": 99},
|
|
"from": {"id": 99},
|
|
"text": "status do pedido",
|
|
},
|
|
}
|
|
|
|
with patch.object(service, "_extract_image_attachments", AsyncMock(return_value=[])), patch.object(
|
|
service,
|
|
"_process_message",
|
|
AsyncMock(return_value="Pedido encontrado."),
|
|
), patch.object(
|
|
service,
|
|
"_send_message",
|
|
AsyncMock(side_effect=RuntimeError("falha inesperada de entrega")),
|
|
), patch("app.integrations.telegram_satellite_service.logger.exception") as logger_exception:
|
|
await service._handle_update(session=SimpleNamespace(), update=update)
|
|
|
|
self.assertTrue(logger_exception.called)
|
|
|
|
async def test_persist_last_processed_update_id_keeps_highest_seen_value(self):
|
|
service = self._build_service()
|
|
|
|
service._persist_last_processed_update_id(11)
|
|
service._persist_last_processed_update_id(10)
|
|
|
|
entry = service.state.get_entry(TELEGRAM_RUNTIME_BUCKET, TELEGRAM_RUNTIME_OWNER_ID)
|
|
self.assertEqual(entry["last_update_id"], 11)
|
|
self.assertEqual(service._last_update_id, 11)
|
|
|
|
async def test_schedule_update_processing_allows_parallel_chats(self):
|
|
service = self._build_service()
|
|
release_first_chat = asyncio.Event()
|
|
chat_one_started = asyncio.Event()
|
|
started_chats: list[int] = []
|
|
|
|
async def fake_handle_update(*, session, update):
|
|
chat_id = update["message"]["chat"]["id"]
|
|
started_chats.append(chat_id)
|
|
if chat_id == 1:
|
|
chat_one_started.set()
|
|
await release_first_chat.wait()
|
|
|
|
with patch.object(service, "_handle_update", new=fake_handle_update):
|
|
await service._schedule_update_processing(
|
|
session=SimpleNamespace(),
|
|
update={"update_id": 1, "message": {"chat": {"id": 1}, "text": "primeiro"}},
|
|
)
|
|
await chat_one_started.wait()
|
|
await service._schedule_update_processing(
|
|
session=SimpleNamespace(),
|
|
update={"update_id": 2, "message": {"chat": {"id": 2}, "text": "segundo"}},
|
|
)
|
|
await asyncio.sleep(0)
|
|
|
|
self.assertEqual(started_chats, [1, 2])
|
|
release_first_chat.set()
|
|
await asyncio.sleep(0)
|
|
|
|
async def test_schedule_update_processing_preserves_order_per_chat(self):
|
|
service = self._build_service()
|
|
first_started = asyncio.Event()
|
|
allow_first_to_finish = asyncio.Event()
|
|
second_started = asyncio.Event()
|
|
started_updates: list[int] = []
|
|
|
|
async def fake_handle_update(*, session, update):
|
|
update_id = update["update_id"]
|
|
started_updates.append(update_id)
|
|
if update_id == 1:
|
|
first_started.set()
|
|
await allow_first_to_finish.wait()
|
|
return
|
|
second_started.set()
|
|
|
|
with patch.object(service, "_handle_update", new=fake_handle_update):
|
|
await service._schedule_update_processing(
|
|
session=SimpleNamespace(),
|
|
update={"update_id": 1, "message": {"chat": {"id": 1}, "text": "primeiro"}},
|
|
)
|
|
await first_started.wait()
|
|
await service._schedule_update_processing(
|
|
session=SimpleNamespace(),
|
|
update={"update_id": 2, "message": {"chat": {"id": 1}, "text": "segundo"}},
|
|
)
|
|
await asyncio.sleep(0)
|
|
self.assertFalse(second_started.is_set())
|
|
|
|
allow_first_to_finish.set()
|
|
await asyncio.wait_for(second_started.wait(), timeout=1)
|
|
|
|
self.assertEqual(started_updates, [1, 2])
|
|
|