🔁 fix(telegram): garantir idempotencia em reentregas

Define uma chave de idempotencia por mensagem do Telegram usando chat_id e message_id, com fallback para update_id, para impedir reprocessamento de updates repetidos ou reentregues.

Reaproveita a resposta ja calculada nas duplicatas e registra o cache recente por chat no backend de estado, preservando exatamente uma execucao dos efeitos de negocio.

Amplia a cobertura com testes para mensagem duplicada, mesmo texto com message_id novo e preservacao do comportamento atual do satellite.
main
parent be4992f9c6
commit 2ef283b170

@ -2,20 +2,24 @@ import asyncio
import logging import logging
import os import os
import tempfile import tempfile
from datetime import timedelta
from typing import Any, Dict, List from typing import Any, Dict, List
import aiohttp import aiohttp
from fastapi import HTTPException from fastapi import HTTPException
from app.core.settings import settings from app.core.settings import settings
from app.core.time_utils import utc_now
from app.db.database import SessionLocal from app.db.database import SessionLocal
from app.db.mock_database import SessionMockLocal from app.db.mock_database import SessionMockLocal
from app.services.ai.llm_service import ( from app.services.ai.llm_service import (
IMAGE_ANALYSIS_BLOCKING_PREFIXES, IMAGE_ANALYSIS_BLOCKING_PREFIXES,
LLMService, LLMService,
) )
from app.services.orchestration.conversation_state_repository import ConversationStateRepository
from app.services.orchestration.orquestrador_service import OrquestradorService from app.services.orchestration.orquestrador_service import OrquestradorService
from app.services.orchestration.sensitive_data import mask_sensitive_payload from app.services.orchestration.sensitive_data import mask_sensitive_payload
from app.services.orchestration.state_repository_factory import get_conversation_state_repository
from app.services.user.user_service import UserService from app.services.user.user_service import UserService
@ -23,6 +27,8 @@ logger = logging.getLogger(__name__)
TELEGRAM_MESSAGE_SAFE_LIMIT = 3800 TELEGRAM_MESSAGE_SAFE_LIMIT = 3800
TELEGRAM_MAX_CONCURRENT_CHATS = 8 TELEGRAM_MAX_CONCURRENT_CHATS = 8
TELEGRAM_IDEMPOTENCY_BUCKET = "telegram_processed_messages"
TELEGRAM_IDEMPOTENCY_CACHE_LIMIT = 100
def _split_telegram_text(text: str, limit: int = TELEGRAM_MESSAGE_SAFE_LIMIT) -> List[str]: def _split_telegram_text(text: str, limit: int = TELEGRAM_MESSAGE_SAFE_LIMIT) -> List[str]:
@ -125,12 +131,17 @@ class TelegramSatelliteService:
Processa mensagens direto no OrquestradorService e publica respostas no chat. Processa mensagens direto no OrquestradorService e publica respostas no chat.
""" """
def __init__(self, token: str): def __init__(
self,
token: str,
state_repository: ConversationStateRepository | None = None,
):
"""Configura cliente Telegram com URL base e timeouts padrao.""" """Configura cliente Telegram com URL base e timeouts padrao."""
self.base_url = f"https://api.telegram.org/bot{token}" self.base_url = f"https://api.telegram.org/bot{token}"
self.file_base_url = f"https://api.telegram.org/file/bot{token}" self.file_base_url = f"https://api.telegram.org/file/bot{token}"
self.polling_timeout = settings.telegram_polling_timeout self.polling_timeout = settings.telegram_polling_timeout
self.request_timeout = settings.telegram_request_timeout self.request_timeout = settings.telegram_request_timeout
self.state = state_repository or get_conversation_state_repository()
self._last_update_id = -1 self._last_update_id = -1
self._chat_queues: dict[int, asyncio.Queue[Dict[str, Any]]] = {} self._chat_queues: dict[int, asyncio.Queue[Dict[str, Any]]] = {}
self._chat_workers: dict[int, asyncio.Task[None]] = {} self._chat_workers: dict[int, asyncio.Task[None]] = {}
@ -167,6 +178,72 @@ class TelegramSatelliteService:
chat_id = chat.get("id") chat_id = chat.get("id")
return chat_id if isinstance(chat_id, int) else None return chat_id if isinstance(chat_id, int) else None
def _build_update_idempotency_key(self, update: Dict[str, Any]) -> str | None:
chat_id = self._extract_chat_id(update)
message = update.get("message", {})
message_id = message.get("message_id")
if isinstance(chat_id, int) and isinstance(message_id, int):
return f"telegram:message:{chat_id}:{message_id}"
update_id = update.get("update_id")
if isinstance(update_id, int):
return f"telegram:update:{update_id}"
return None
def _idempotency_owner_id(self, update: Dict[str, Any]) -> int | None:
chat_id = self._extract_chat_id(update)
if isinstance(chat_id, int):
return chat_id
update_id = update.get("update_id")
return update_id if isinstance(update_id, int) else None
def _get_processed_update(self, update: Dict[str, Any]) -> dict | None:
owner_id = self._idempotency_owner_id(update)
idempotency_key = self._build_update_idempotency_key(update)
if owner_id is None or not idempotency_key:
return None
entry = self.state.get_entry(TELEGRAM_IDEMPOTENCY_BUCKET, owner_id, expire=True)
if not isinstance(entry, dict):
return None
items = entry.get("items")
if not isinstance(items, dict):
return None
payload = items.get(idempotency_key)
return payload if isinstance(payload, dict) else None
def _store_processed_update(self, update: Dict[str, Any], answer: str) -> None:
owner_id = self._idempotency_owner_id(update)
idempotency_key = self._build_update_idempotency_key(update)
if owner_id is None or not idempotency_key:
return
now = utc_now().replace(microsecond=0)
expires_at = now + timedelta(minutes=settings.conversation_state_ttl_minutes)
entry = self.state.get_entry(TELEGRAM_IDEMPOTENCY_BUCKET, owner_id, expire=True) or {}
items = dict(entry.get("items") or {})
items[idempotency_key] = {
"answer": str(answer or ""),
"processed_at": now,
}
if len(items) > TELEGRAM_IDEMPOTENCY_CACHE_LIMIT:
ordered = sorted(
items.items(),
key=lambda item: item[1].get("processed_at") or now,
reverse=True,
)
items = dict(ordered[:TELEGRAM_IDEMPOTENCY_CACHE_LIMIT])
self.state.set_entry(
TELEGRAM_IDEMPOTENCY_BUCKET,
owner_id,
{
"items": items,
"expires_at": expires_at,
},
)
async def _schedule_update_processing( async def _schedule_update_processing(
self, self,
session: aiohttp.ClientSession, session: aiohttp.ClientSession,
@ -310,9 +387,23 @@ class TelegramSatelliteService:
chat_id = chat.get("id") chat_id = chat.get("id")
sender = message.get("from", {}) sender = message.get("from", {})
if not chat_id:
return
cached_update = self._get_processed_update(update)
if cached_update:
cached_answer = str(cached_update.get("answer") or "").strip()
if cached_answer:
logger.info(
"Reutilizando resposta em reentrega do Telegram. chat_id=%s update_key=%s",
chat_id,
self._build_update_idempotency_key(update),
)
await self._send_message(session=session, chat_id=chat_id, text=cached_answer)
return
image_attachments = await self._extract_image_attachments(session=session, message=message) image_attachments = await self._extract_image_attachments(session=session, message=message)
if (not text and not image_attachments) or not chat_id: if not text and not image_attachments:
return return
try: try:
@ -329,6 +420,7 @@ class TelegramSatelliteService:
logger.exception("Erro ao processar mensagem do Telegram.") logger.exception("Erro ao processar mensagem do Telegram.")
answer = "Nao consegui processar sua solicitacao agora. Tente novamente em instantes." answer = "Nao consegui processar sua solicitacao agora. Tente novamente em instantes."
self._store_processed_update(update=update, answer=answer)
await self._send_message(session=session, chat_id=chat_id, text=answer) await self._send_message(session=session, chat_id=chat_id, text=answer)
async def _send_message( async def _send_message(

@ -19,6 +19,7 @@ class ConversationStateStore(ConversationStateRepository):
self.pending_stock_selections: dict[int, dict] = {} self.pending_stock_selections: dict[int, dict] = {}
self.pending_rental_drafts: dict[int, dict] = {} self.pending_rental_drafts: dict[int, dict] = {}
self.pending_rental_selections: dict[int, dict] = {} self.pending_rental_selections: dict[int, dict] = {}
self.telegram_processed_messages: dict[int, dict] = {}
def upsert_user_context(self, user_id: int | None, ttl_minutes: int) -> None: def upsert_user_context(self, user_id: int | None, ttl_minutes: int) -> None:
if user_id is None: if user_id is None:
@ -83,3 +84,4 @@ class ConversationStateStore(ConversationStateRepository):
if user_id is None: if user_id is None:
return None return None
return getattr(self, bucket).pop(user_id, None) return getattr(self, bucket).pop(user_id, None)

@ -6,6 +6,7 @@ from unittest.mock import AsyncMock, patch
from fastapi import HTTPException from fastapi import HTTPException
from app.integrations.telegram_satellite_service import TelegramSatelliteService from app.integrations.telegram_satellite_service import TelegramSatelliteService
from app.services.orchestration.conversation_state_store import ConversationStateStore
class _DummySession: class _DummySession:
@ -14,14 +15,21 @@ class _DummySession:
class TelegramMultimodalTests(unittest.IsolatedAsyncioTestCase): class TelegramMultimodalTests(unittest.IsolatedAsyncioTestCase):
def _build_service(self) -> TelegramSatelliteService:
service = TelegramSatelliteService(
"token-teste",
state_repository=ConversationStateStore(),
)
self._service_under_test = service
return service
async def asyncTearDown(self): async def asyncTearDown(self):
service = getattr(self, "_service_under_test", None) service = getattr(self, "_service_under_test", None)
if service is not None: if service is not None:
await service._shutdown_chat_workers() await service._shutdown_chat_workers()
async def test_process_message_uses_extracted_image_message(self): async def test_process_message_uses_extracted_image_message(self):
service = TelegramSatelliteService("token-teste") service = self._build_service()
self._service_under_test = service
tools_db = _DummySession() tools_db = _DummySession()
mock_db = _DummySession() mock_db = _DummySession()
@ -52,8 +60,7 @@ class TelegramMultimodalTests(unittest.IsolatedAsyncioTestCase):
self.assertEqual(kwargs["user_id"], 7) self.assertEqual(kwargs["user_id"], 7)
async def test_process_message_returns_direct_failure_for_unreadable_image(self): async def test_process_message_returns_direct_failure_for_unreadable_image(self):
service = TelegramSatelliteService("token-teste") service = self._build_service()
self._service_under_test = service
tools_db = _DummySession() tools_db = _DummySession()
mock_db = _DummySession() mock_db = _DummySession()
@ -81,8 +88,7 @@ class TelegramMultimodalTests(unittest.IsolatedAsyncioTestCase):
self.assertFalse(orchestrator_cls.return_value.handle_message.await_count) self.assertFalse(orchestrator_cls.return_value.handle_message.await_count)
async def test_process_message_returns_direct_failure_for_receipt_without_watermark(self): async def test_process_message_returns_direct_failure_for_receipt_without_watermark(self):
service = TelegramSatelliteService("token-teste") service = self._build_service()
self._service_under_test = service
tools_db = _DummySession() tools_db = _DummySession()
mock_db = _DummySession() mock_db = _DummySession()
@ -110,8 +116,7 @@ class TelegramMultimodalTests(unittest.IsolatedAsyncioTestCase):
self.assertFalse(orchestrator_cls.return_value.handle_message.await_count) self.assertFalse(orchestrator_cls.return_value.handle_message.await_count)
async def test_handle_update_masks_sensitive_domain_error_in_logs(self): async def test_handle_update_masks_sensitive_domain_error_in_logs(self):
service = TelegramSatelliteService("token-teste") service = self._build_service()
self._service_under_test = service
update = { update = {
"update_id": 1, "update_id": 1,
"message": { "message": {
@ -151,9 +156,69 @@ class TelegramMultimodalTests(unittest.IsolatedAsyncioTestCase):
self.assertIn("******321", logged_detail) self.assertIn("******321", logged_detail)
self.assertIn("***123", logged_detail) self.assertIn("***123", logged_detail)
async def test_handle_update_reuses_cached_answer_for_duplicate_message(self):
service = self._build_service()
update = {
"update_id": 10,
"message": {
"message_id": 77,
"chat": {"id": 99},
"from": {"id": 99},
"text": "quero ver a frota",
},
}
with patch.object(service, "_extract_image_attachments", AsyncMock(return_value=[])), patch.object(
service,
"_process_message",
AsyncMock(return_value="Segue a frota disponivel."),
) as process_message, patch.object(service, "_send_message", AsyncMock()) as send_message:
await service._handle_update(session=SimpleNamespace(), update=update)
await service._handle_update(session=SimpleNamespace(), update=update)
self.assertEqual(process_message.await_count, 1)
self.assertEqual(send_message.await_count, 2)
first_text = send_message.await_args_list[0].kwargs["text"]
second_text = send_message.await_args_list[1].kwargs["text"]
self.assertEqual(first_text, "Segue a frota disponivel.")
self.assertEqual(second_text, "Segue a frota disponivel.")
async def test_handle_update_processes_same_text_again_when_message_id_changes(self):
service = self._build_service()
first_update = {
"update_id": 10,
"message": {
"message_id": 77,
"chat": {"id": 99},
"from": {"id": 99},
"text": "quero ver a frota",
},
}
second_update = {
"update_id": 11,
"message": {
"message_id": 78,
"chat": {"id": 99},
"from": {"id": 99},
"text": "quero ver a frota",
},
}
with patch.object(service, "_extract_image_attachments", AsyncMock(return_value=[])), patch.object(
service,
"_process_message",
AsyncMock(side_effect=["Resposta 1", "Resposta 2"]),
) as process_message, patch.object(service, "_send_message", AsyncMock()) as send_message:
await service._handle_update(session=SimpleNamespace(), update=first_update)
await service._handle_update(session=SimpleNamespace(), update=second_update)
self.assertEqual(process_message.await_count, 2)
self.assertEqual(send_message.await_count, 2)
self.assertEqual(send_message.await_args_list[0].kwargs["text"], "Resposta 1")
self.assertEqual(send_message.await_args_list[1].kwargs["text"], "Resposta 2")
async def test_schedule_update_processing_allows_parallel_chats(self): async def test_schedule_update_processing_allows_parallel_chats(self):
service = TelegramSatelliteService("token-teste") service = self._build_service()
self._service_under_test = service
release_first_chat = asyncio.Event() release_first_chat = asyncio.Event()
chat_one_started = asyncio.Event() chat_one_started = asyncio.Event()
started_chats: list[int] = [] started_chats: list[int] = []
@ -182,8 +247,7 @@ class TelegramMultimodalTests(unittest.IsolatedAsyncioTestCase):
await asyncio.sleep(0) await asyncio.sleep(0)
async def test_schedule_update_processing_preserves_order_per_chat(self): async def test_schedule_update_processing_preserves_order_per_chat(self):
service = TelegramSatelliteService("token-teste") service = self._build_service()
self._service_under_test = service
first_started = asyncio.Event() first_started = asyncio.Event()
allow_first_to_finish = asyncio.Event() allow_first_to_finish = asyncio.Event()
second_started = asyncio.Event() second_started = asyncio.Event()

Loading…
Cancel
Save