import unittest import asyncio from types import SimpleNamespace from unittest.mock import AsyncMock, patch import aiohttp from fastapi import HTTPException from app.integrations.telegram_satellite_service import ( TELEGRAM_RUNTIME_BUCKET, TELEGRAM_RUNTIME_OWNER_ID, TelegramSatelliteService, ) from app.services.orchestration.conversation_state_store import ConversationStateStore class _DummySession: def close(self): return None class _FakeTelegramResponse: def __init__(self, payload): self.payload = payload async def __aenter__(self): return self async def __aexit__(self, exc_type, exc, tb): return None async def json(self): return self.payload class _FakeTelegramSession: def __init__(self, payload): self.payload = payload self.calls = [] def post(self, url, json): self.calls.append((url, json)) return _FakeTelegramResponse(self.payload) class _FlakyTelegramResponse: def __init__(self, outcome): self.outcome = outcome async def __aenter__(self): if isinstance(self.outcome, BaseException): raise self.outcome return self async def __aexit__(self, exc_type, exc, tb): return None async def json(self): return self.outcome class _FlakyTelegramSession: def __init__(self, outcomes): self.outcomes = list(outcomes) self.calls = [] def post(self, url, json): self.calls.append((url, json)) if self.outcomes: outcome = self.outcomes.pop(0) else: outcome = {"ok": True} return _FlakyTelegramResponse(outcome) class TelegramMultimodalTests(unittest.IsolatedAsyncioTestCase): def _build_service(self) -> TelegramSatelliteService: service = TelegramSatelliteService( "token-teste", state_repository=ConversationStateStore(), ) self._service_under_test = service return service async def asyncTearDown(self): service = getattr(self, "_service_under_test", None) if service is not None: await service._shutdown_chat_workers() async def test_process_message_uses_extracted_image_message(self): service = self._build_service() tools_db = _DummySession() mock_db = _DummySession() with patch("app.integrations.telegram_satellite_service.SessionLocal", return_value=tools_db), patch( "app.integrations.telegram_satellite_service.SessionMockLocal", return_value=mock_db, ), patch("app.integrations.telegram_satellite_service.UserService") as user_service_cls, patch( "app.integrations.telegram_satellite_service.OrquestradorService" ) as orchestrator_cls, patch.object( service, "_build_orchestration_message_from_image", AsyncMock(return_value="[imagem recebida no telegram]\nDados extraidos da imagem: Registrar pagamento de aluguel: placa ABC1D23; valor 293,47; data_pagamento 17/03/2026 14:30; identificador_comprovante NSU123."), ): user_service_cls.return_value.get_or_create.return_value = SimpleNamespace(id=7) orchestrator_cls.return_value.handle_message = AsyncMock(return_value="ok") answer = await service._process_message( text="segue o comprovante", sender={"id": 99, "first_name": "Vitor"}, chat_id=99, image_attachments=[{"mime_type": "image/jpeg", "data": b"123"}], ) self.assertEqual(answer, "ok") orchestrator_cls.return_value.handle_message.assert_awaited_once() kwargs = orchestrator_cls.return_value.handle_message.await_args.kwargs self.assertIn("Registrar pagamento de aluguel", kwargs["message"]) self.assertEqual(kwargs["user_id"], 7) async def test_process_message_returns_direct_failure_for_unreadable_image(self): service = self._build_service() tools_db = _DummySession() mock_db = _DummySession() with patch("app.integrations.telegram_satellite_service.SessionLocal", return_value=tools_db), patch( "app.integrations.telegram_satellite_service.SessionMockLocal", return_value=mock_db, ), patch("app.integrations.telegram_satellite_service.UserService") as user_service_cls, patch( "app.integrations.telegram_satellite_service.OrquestradorService" ) as orchestrator_cls, patch.object( service, "_build_orchestration_message_from_image", AsyncMock(return_value="Nao consegui identificar os dados da imagem. Descreva o documento ou envie uma foto mais nitida."), ): user_service_cls.return_value.get_or_create.return_value = SimpleNamespace(id=7) orchestrator_cls.return_value.handle_message = AsyncMock() answer = await service._process_message( text="", sender={"id": 99}, chat_id=99, image_attachments=[{"mime_type": "image/jpeg", "data": b"123"}], ) self.assertIn("Nao consegui identificar os dados da imagem", answer) self.assertFalse(orchestrator_cls.return_value.handle_message.await_count) async def test_process_message_returns_direct_failure_for_receipt_without_watermark(self): service = self._build_service() tools_db = _DummySession() mock_db = _DummySession() with patch("app.integrations.telegram_satellite_service.SessionLocal", return_value=tools_db), patch( "app.integrations.telegram_satellite_service.SessionMockLocal", return_value=mock_db, ), patch("app.integrations.telegram_satellite_service.UserService") as user_service_cls, patch( "app.integrations.telegram_satellite_service.OrquestradorService" ) as orchestrator_cls, patch.object( service, "_build_orchestration_message_from_image", AsyncMock(return_value="O comprovante enviado nao e valido. Envie um comprovante valido com a marca d'agua SysaltiIA visivel."), ): user_service_cls.return_value.get_or_create.return_value = SimpleNamespace(id=7) orchestrator_cls.return_value.handle_message = AsyncMock() answer = await service._process_message( text="segue o comprovante", sender={"id": 99}, chat_id=99, image_attachments=[{"mime_type": "image/jpeg", "data": b"123"}], ) self.assertIn("marca d'agua SysaltiIA visivel", answer) self.assertFalse(orchestrator_cls.return_value.handle_message.await_count) async def test_process_message_offloads_blocking_turn_to_worker_thread(self): service = self._build_service() with patch( "app.integrations.telegram_satellite_service.asyncio.to_thread", AsyncMock(return_value="ok"), ) as to_thread: answer = await service._process_message( text="quero ver a frota", sender={"id": 99, "first_name": "Vitor"}, chat_id=99, image_attachments=[], ) self.assertEqual(answer, "ok") self.assertEqual(to_thread.await_count, 1) self.assertEqual(to_thread.await_args.kwargs["message_text"], "quero ver a frota") self.assertEqual(to_thread.await_args.kwargs["chat_id"], 99) self.assertEqual(to_thread.await_args.kwargs["sender"]["id"], 99) async def test_handle_update_masks_sensitive_domain_error_in_logs(self): service = self._build_service() update = { "update_id": 1, "message": { "chat": {"id": 99}, "from": {"id": 99}, "text": "segue o pagamento", }, } with patch.object(service, "_extract_image_attachments", AsyncMock(return_value=[])), patch.object( service, "_process_message", AsyncMock( side_effect=HTTPException( status_code=409, detail={ "cpf": "12345678909", "placa": "ABC1D23", "external_id": "987654321", "identificador_comprovante": "NSU123", }, ) ), ), patch.object(service, "_send_message", AsyncMock()), patch( "app.integrations.telegram_satellite_service.logger.warning" ) as logger_warning: await service._handle_update(session=SimpleNamespace(), update=update) self.assertTrue(logger_warning.called) logged_detail = str(logger_warning.call_args.args[1]) self.assertNotIn("12345678909", logged_detail) self.assertNotIn("ABC1D23", logged_detail) self.assertNotIn("987654321", logged_detail) self.assertNotIn("NSU123", logged_detail) self.assertIn("***.***.***-09", logged_detail) self.assertIn("ABC***3", logged_detail) self.assertIn("******321", logged_detail) self.assertIn("***123", logged_detail) async def test_handle_update_reuses_cached_answer_for_duplicate_message(self): service = self._build_service() update = { "update_id": 10, "message": { "message_id": 77, "chat": {"id": 99}, "from": {"id": 99}, "text": "quero ver a frota", }, } with patch.object(service, "_extract_image_attachments", AsyncMock(return_value=[])), patch.object( service, "_process_message", AsyncMock(return_value="Segue a frota disponivel."), ) as process_message, patch.object(service, "_send_message", AsyncMock()) as send_message: await service._handle_update(session=SimpleNamespace(), update=update) await service._handle_update(session=SimpleNamespace(), update=update) self.assertEqual(process_message.await_count, 1) self.assertEqual(send_message.await_count, 2) first_text = send_message.await_args_list[0].kwargs["text"] second_text = send_message.await_args_list[1].kwargs["text"] self.assertEqual(first_text, "Segue a frota disponivel.") self.assertEqual(second_text, "Segue a frota disponivel.") async def test_handle_update_processes_same_text_again_when_message_id_changes(self): service = self._build_service() first_update = { "update_id": 10, "message": { "message_id": 77, "chat": {"id": 99}, "from": {"id": 99}, "text": "quero ver a frota", }, } second_update = { "update_id": 11, "message": { "message_id": 78, "chat": {"id": 99}, "from": {"id": 99}, "text": "quero ver a frota", }, } with patch.object(service, "_extract_image_attachments", AsyncMock(return_value=[])), patch.object( service, "_process_message", AsyncMock(side_effect=["Resposta 1", "Resposta 2"]), ) as process_message, patch.object(service, "_send_message", AsyncMock()) as send_message: await service._handle_update(session=SimpleNamespace(), update=first_update) await service._handle_update(session=SimpleNamespace(), update=second_update) self.assertEqual(process_message.await_count, 2) self.assertEqual(send_message.await_count, 2) self.assertEqual(send_message.await_args_list[0].kwargs["text"], "Resposta 1") self.assertEqual(send_message.await_args_list[1].kwargs["text"], "Resposta 2") async def test_initialize_offset_uses_persisted_cursor(self): service = self._build_service() service.state.set_entry( TELEGRAM_RUNTIME_BUCKET, TELEGRAM_RUNTIME_OWNER_ID, {"last_update_id": 41}, ) offset = await service._initialize_offset(session=SimpleNamespace()) self.assertEqual(offset, 42) self.assertEqual(service._last_update_id, 41) async def test_initialize_offset_bootstraps_cursor_once_when_missing(self): service = self._build_service() session = _FakeTelegramSession( { "ok": True, "result": [ {"update_id": 5}, {"update_id": 6}, ], } ) offset = await service._initialize_offset(session=session) self.assertEqual(offset, 7) self.assertEqual(service._last_update_id, 6) entry = service.state.get_entry(TELEGRAM_RUNTIME_BUCKET, TELEGRAM_RUNTIME_OWNER_ID) self.assertEqual(entry["last_update_id"], 6) self.assertEqual(len(session.calls), 1) async def test_handle_update_persists_runtime_cursor(self): service = self._build_service() update = { "update_id": 14, "message": { "message_id": 88, "chat": {"id": 99}, "from": {"id": 99}, "text": "status do pedido", }, } with patch.object(service, "_extract_image_attachments", AsyncMock(return_value=[])), patch.object( service, "_process_message", AsyncMock(return_value="Pedido encontrado."), ), patch.object(service, "_send_message", AsyncMock()): await service._handle_update(session=SimpleNamespace(), update=update) entry = service.state.get_entry(TELEGRAM_RUNTIME_BUCKET, TELEGRAM_RUNTIME_OWNER_ID) self.assertEqual(entry["last_update_id"], 14) async def test_send_message_retries_transient_transport_failure(self): service = self._build_service() session = _FlakyTelegramSession( [ asyncio.TimeoutError(), aiohttp.ClientConnectionError("falha temporaria"), {"ok": True}, ] ) with patch("app.integrations.telegram_satellite_service.asyncio.sleep", AsyncMock()) as sleep_mock: await service._send_message(session=session, chat_id=99, text="resposta teste") self.assertEqual(len(session.calls), 3) self.assertEqual(sleep_mock.await_count, 2) async def test_handle_update_swallows_unexpected_delivery_failure(self): service = self._build_service() update = { "update_id": 15, "message": { "message_id": 89, "chat": {"id": 99}, "from": {"id": 99}, "text": "status do pedido", }, } with patch.object(service, "_extract_image_attachments", AsyncMock(return_value=[])), patch.object( service, "_process_message", AsyncMock(return_value="Pedido encontrado."), ), patch.object( service, "_send_message", AsyncMock(side_effect=RuntimeError("falha inesperada de entrega")), ), patch("app.integrations.telegram_satellite_service.logger.exception") as logger_exception: await service._handle_update(session=SimpleNamespace(), update=update) self.assertTrue(logger_exception.called) async def test_persist_last_processed_update_id_keeps_highest_seen_value(self): service = self._build_service() service._persist_last_processed_update_id(11) service._persist_last_processed_update_id(10) entry = service.state.get_entry(TELEGRAM_RUNTIME_BUCKET, TELEGRAM_RUNTIME_OWNER_ID) self.assertEqual(entry["last_update_id"], 11) self.assertEqual(service._last_update_id, 11) async def test_schedule_update_processing_allows_parallel_chats(self): service = self._build_service() release_first_chat = asyncio.Event() chat_one_started = asyncio.Event() started_chats: list[int] = [] async def fake_handle_update(*, session, update): chat_id = update["message"]["chat"]["id"] started_chats.append(chat_id) if chat_id == 1: chat_one_started.set() await release_first_chat.wait() with patch.object(service, "_handle_update", new=fake_handle_update): await service._schedule_update_processing( session=SimpleNamespace(), update={"update_id": 1, "message": {"chat": {"id": 1}, "text": "primeiro"}}, ) await chat_one_started.wait() await service._schedule_update_processing( session=SimpleNamespace(), update={"update_id": 2, "message": {"chat": {"id": 2}, "text": "segundo"}}, ) await asyncio.sleep(0) self.assertEqual(started_chats, [1, 2]) release_first_chat.set() await asyncio.sleep(0) async def test_schedule_update_processing_preserves_order_per_chat(self): service = self._build_service() first_started = asyncio.Event() allow_first_to_finish = asyncio.Event() second_started = asyncio.Event() started_updates: list[int] = [] async def fake_handle_update(*, session, update): update_id = update["update_id"] started_updates.append(update_id) if update_id == 1: first_started.set() await allow_first_to_finish.wait() return second_started.set() with patch.object(service, "_handle_update", new=fake_handle_update): await service._schedule_update_processing( session=SimpleNamespace(), update={"update_id": 1, "message": {"chat": {"id": 1}, "text": "primeiro"}}, ) await first_started.wait() await service._schedule_update_processing( session=SimpleNamespace(), update={"update_id": 2, "message": {"chat": {"id": 1}, "text": "segundo"}}, ) await asyncio.sleep(0) self.assertFalse(second_started.is_set()) allow_first_to_finish.set() await asyncio.wait_for(second_started.wait(), timeout=1) self.assertEqual(started_updates, [1, 2])