diff --git a/app/integrations/telegram_satellite_service.py b/app/integrations/telegram_satellite_service.py index f3b14d0..81c0eae 100644 --- a/app/integrations/telegram_satellite_service.py +++ b/app/integrations/telegram_satellite_service.py @@ -2,20 +2,24 @@ import asyncio import logging import os import tempfile +from datetime import timedelta from typing import Any, Dict, List import aiohttp from fastapi import HTTPException from app.core.settings import settings +from app.core.time_utils import utc_now from app.db.database import SessionLocal from app.db.mock_database import SessionMockLocal from app.services.ai.llm_service import ( IMAGE_ANALYSIS_BLOCKING_PREFIXES, LLMService, ) +from app.services.orchestration.conversation_state_repository import ConversationStateRepository from app.services.orchestration.orquestrador_service import OrquestradorService from app.services.orchestration.sensitive_data import mask_sensitive_payload +from app.services.orchestration.state_repository_factory import get_conversation_state_repository from app.services.user.user_service import UserService @@ -23,6 +27,8 @@ logger = logging.getLogger(__name__) TELEGRAM_MESSAGE_SAFE_LIMIT = 3800 TELEGRAM_MAX_CONCURRENT_CHATS = 8 +TELEGRAM_IDEMPOTENCY_BUCKET = "telegram_processed_messages" +TELEGRAM_IDEMPOTENCY_CACHE_LIMIT = 100 def _split_telegram_text(text: str, limit: int = TELEGRAM_MESSAGE_SAFE_LIMIT) -> List[str]: @@ -125,12 +131,17 @@ class TelegramSatelliteService: Processa mensagens direto no OrquestradorService e publica respostas no chat. """ - def __init__(self, token: str): + def __init__( + self, + token: str, + state_repository: ConversationStateRepository | None = None, + ): """Configura cliente Telegram com URL base e timeouts padrao.""" self.base_url = f"https://api.telegram.org/bot{token}" self.file_base_url = f"https://api.telegram.org/file/bot{token}" self.polling_timeout = settings.telegram_polling_timeout self.request_timeout = settings.telegram_request_timeout + self.state = state_repository or get_conversation_state_repository() self._last_update_id = -1 self._chat_queues: dict[int, asyncio.Queue[Dict[str, Any]]] = {} self._chat_workers: dict[int, asyncio.Task[None]] = {} @@ -167,6 +178,72 @@ class TelegramSatelliteService: chat_id = chat.get("id") return chat_id if isinstance(chat_id, int) else None + def _build_update_idempotency_key(self, update: Dict[str, Any]) -> str | None: + chat_id = self._extract_chat_id(update) + message = update.get("message", {}) + message_id = message.get("message_id") + if isinstance(chat_id, int) and isinstance(message_id, int): + return f"telegram:message:{chat_id}:{message_id}" + + update_id = update.get("update_id") + if isinstance(update_id, int): + return f"telegram:update:{update_id}" + return None + + def _idempotency_owner_id(self, update: Dict[str, Any]) -> int | None: + chat_id = self._extract_chat_id(update) + if isinstance(chat_id, int): + return chat_id + update_id = update.get("update_id") + return update_id if isinstance(update_id, int) else None + + def _get_processed_update(self, update: Dict[str, Any]) -> dict | None: + owner_id = self._idempotency_owner_id(update) + idempotency_key = self._build_update_idempotency_key(update) + if owner_id is None or not idempotency_key: + return None + + entry = self.state.get_entry(TELEGRAM_IDEMPOTENCY_BUCKET, owner_id, expire=True) + if not isinstance(entry, dict): + return None + + items = entry.get("items") + if not isinstance(items, dict): + return None + payload = items.get(idempotency_key) + return payload if isinstance(payload, dict) else None + + def _store_processed_update(self, update: Dict[str, Any], answer: str) -> None: + owner_id = self._idempotency_owner_id(update) + idempotency_key = self._build_update_idempotency_key(update) + if owner_id is None or not idempotency_key: + return + + now = utc_now().replace(microsecond=0) + expires_at = now + timedelta(minutes=settings.conversation_state_ttl_minutes) + entry = self.state.get_entry(TELEGRAM_IDEMPOTENCY_BUCKET, owner_id, expire=True) or {} + items = dict(entry.get("items") or {}) + items[idempotency_key] = { + "answer": str(answer or ""), + "processed_at": now, + } + if len(items) > TELEGRAM_IDEMPOTENCY_CACHE_LIMIT: + ordered = sorted( + items.items(), + key=lambda item: item[1].get("processed_at") or now, + reverse=True, + ) + items = dict(ordered[:TELEGRAM_IDEMPOTENCY_CACHE_LIMIT]) + + self.state.set_entry( + TELEGRAM_IDEMPOTENCY_BUCKET, + owner_id, + { + "items": items, + "expires_at": expires_at, + }, + ) + async def _schedule_update_processing( self, session: aiohttp.ClientSession, @@ -310,9 +387,23 @@ class TelegramSatelliteService: chat_id = chat.get("id") sender = message.get("from", {}) + if not chat_id: + return + cached_update = self._get_processed_update(update) + if cached_update: + cached_answer = str(cached_update.get("answer") or "").strip() + if cached_answer: + logger.info( + "Reutilizando resposta em reentrega do Telegram. chat_id=%s update_key=%s", + chat_id, + self._build_update_idempotency_key(update), + ) + await self._send_message(session=session, chat_id=chat_id, text=cached_answer) + return + image_attachments = await self._extract_image_attachments(session=session, message=message) - if (not text and not image_attachments) or not chat_id: + if not text and not image_attachments: return try: @@ -329,6 +420,7 @@ class TelegramSatelliteService: logger.exception("Erro ao processar mensagem do Telegram.") answer = "Nao consegui processar sua solicitacao agora. Tente novamente em instantes." + self._store_processed_update(update=update, answer=answer) await self._send_message(session=session, chat_id=chat_id, text=answer) async def _send_message( diff --git a/app/services/orchestration/conversation_state_store.py b/app/services/orchestration/conversation_state_store.py index 89f87d5..73dea6b 100644 --- a/app/services/orchestration/conversation_state_store.py +++ b/app/services/orchestration/conversation_state_store.py @@ -19,6 +19,7 @@ class ConversationStateStore(ConversationStateRepository): self.pending_stock_selections: dict[int, dict] = {} self.pending_rental_drafts: dict[int, dict] = {} self.pending_rental_selections: dict[int, dict] = {} + self.telegram_processed_messages: dict[int, dict] = {} def upsert_user_context(self, user_id: int | None, ttl_minutes: int) -> None: if user_id is None: @@ -83,3 +84,4 @@ class ConversationStateStore(ConversationStateRepository): if user_id is None: return None return getattr(self, bucket).pop(user_id, None) + diff --git a/tests/test_telegram_multimodal.py b/tests/test_telegram_multimodal.py index ae4b636..f45d27c 100644 --- a/tests/test_telegram_multimodal.py +++ b/tests/test_telegram_multimodal.py @@ -6,6 +6,7 @@ from unittest.mock import AsyncMock, patch from fastapi import HTTPException from app.integrations.telegram_satellite_service import TelegramSatelliteService +from app.services.orchestration.conversation_state_store import ConversationStateStore class _DummySession: @@ -14,14 +15,21 @@ class _DummySession: class TelegramMultimodalTests(unittest.IsolatedAsyncioTestCase): + def _build_service(self) -> TelegramSatelliteService: + service = TelegramSatelliteService( + "token-teste", + state_repository=ConversationStateStore(), + ) + self._service_under_test = service + return service + async def asyncTearDown(self): service = getattr(self, "_service_under_test", None) if service is not None: await service._shutdown_chat_workers() async def test_process_message_uses_extracted_image_message(self): - service = TelegramSatelliteService("token-teste") - self._service_under_test = service + service = self._build_service() tools_db = _DummySession() mock_db = _DummySession() @@ -52,8 +60,7 @@ class TelegramMultimodalTests(unittest.IsolatedAsyncioTestCase): self.assertEqual(kwargs["user_id"], 7) async def test_process_message_returns_direct_failure_for_unreadable_image(self): - service = TelegramSatelliteService("token-teste") - self._service_under_test = service + service = self._build_service() tools_db = _DummySession() mock_db = _DummySession() @@ -81,8 +88,7 @@ class TelegramMultimodalTests(unittest.IsolatedAsyncioTestCase): self.assertFalse(orchestrator_cls.return_value.handle_message.await_count) async def test_process_message_returns_direct_failure_for_receipt_without_watermark(self): - service = TelegramSatelliteService("token-teste") - self._service_under_test = service + service = self._build_service() tools_db = _DummySession() mock_db = _DummySession() @@ -110,8 +116,7 @@ class TelegramMultimodalTests(unittest.IsolatedAsyncioTestCase): self.assertFalse(orchestrator_cls.return_value.handle_message.await_count) async def test_handle_update_masks_sensitive_domain_error_in_logs(self): - service = TelegramSatelliteService("token-teste") - self._service_under_test = service + service = self._build_service() update = { "update_id": 1, "message": { @@ -151,9 +156,69 @@ class TelegramMultimodalTests(unittest.IsolatedAsyncioTestCase): self.assertIn("******321", logged_detail) self.assertIn("***123", logged_detail) + async def test_handle_update_reuses_cached_answer_for_duplicate_message(self): + service = self._build_service() + update = { + "update_id": 10, + "message": { + "message_id": 77, + "chat": {"id": 99}, + "from": {"id": 99}, + "text": "quero ver a frota", + }, + } + + with patch.object(service, "_extract_image_attachments", AsyncMock(return_value=[])), patch.object( + service, + "_process_message", + AsyncMock(return_value="Segue a frota disponivel."), + ) as process_message, patch.object(service, "_send_message", AsyncMock()) as send_message: + await service._handle_update(session=SimpleNamespace(), update=update) + await service._handle_update(session=SimpleNamespace(), update=update) + + self.assertEqual(process_message.await_count, 1) + self.assertEqual(send_message.await_count, 2) + first_text = send_message.await_args_list[0].kwargs["text"] + second_text = send_message.await_args_list[1].kwargs["text"] + self.assertEqual(first_text, "Segue a frota disponivel.") + self.assertEqual(second_text, "Segue a frota disponivel.") + + async def test_handle_update_processes_same_text_again_when_message_id_changes(self): + service = self._build_service() + first_update = { + "update_id": 10, + "message": { + "message_id": 77, + "chat": {"id": 99}, + "from": {"id": 99}, + "text": "quero ver a frota", + }, + } + second_update = { + "update_id": 11, + "message": { + "message_id": 78, + "chat": {"id": 99}, + "from": {"id": 99}, + "text": "quero ver a frota", + }, + } + + with patch.object(service, "_extract_image_attachments", AsyncMock(return_value=[])), patch.object( + service, + "_process_message", + AsyncMock(side_effect=["Resposta 1", "Resposta 2"]), + ) as process_message, patch.object(service, "_send_message", AsyncMock()) as send_message: + await service._handle_update(session=SimpleNamespace(), update=first_update) + await service._handle_update(session=SimpleNamespace(), update=second_update) + + self.assertEqual(process_message.await_count, 2) + self.assertEqual(send_message.await_count, 2) + self.assertEqual(send_message.await_args_list[0].kwargs["text"], "Resposta 1") + self.assertEqual(send_message.await_args_list[1].kwargs["text"], "Resposta 2") + async def test_schedule_update_processing_allows_parallel_chats(self): - service = TelegramSatelliteService("token-teste") - self._service_under_test = service + service = self._build_service() release_first_chat = asyncio.Event() chat_one_started = asyncio.Event() started_chats: list[int] = [] @@ -182,8 +247,7 @@ class TelegramMultimodalTests(unittest.IsolatedAsyncioTestCase): await asyncio.sleep(0) async def test_schedule_update_processing_preserves_order_per_chat(self): - service = TelegramSatelliteService("token-teste") - self._service_under_test = service + service = self._build_service() first_started = asyncio.Event() allow_first_to_finish = asyncio.Event() second_started = asyncio.Event()