From 6f690944129e87617940b0f6957a91fa38c86bf8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vitor=20Hugo=20Belorio=20Sim=C3=A3o?= Date: Fri, 20 Mar 2026 17:22:05 -0300 Subject: [PATCH] =?UTF-8?q?=F0=9F=A7=B5=20refactor(telegram):=20retomar=20?= =?UTF-8?q?updates=20e=20isolar=20io=20bloqueante?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Persiste o ultimo update_id processado pelo satellite e retoma o polling a partir desse cursor no restart, usando a idempotencia por mensagem como rede de seguranca para reentregas. Move o turno bloqueante do orquestrador para worker thread com asyncio.to_thread, mantendo no loop async apenas o que e realmente assincrono e endurecendo o backend de estado em memoria para acesso concorrente com RLock. Amplia a cobertura do satellite com testes para cursor persistido, bootstrap inicial, atualizacao monotona do update_id e offload do processamento bloqueante, mantendo a suite completa verde com 218 testes. --- .../telegram_satellite_service.py | 96 ++++++++++++--- .../orchestration/conversation_state_store.py | 92 +++++++------- tests/test_telegram_multimodal.py | 115 +++++++++++++++++- 3 files changed, 244 insertions(+), 59 deletions(-) diff --git a/app/integrations/telegram_satellite_service.py b/app/integrations/telegram_satellite_service.py index 81c0eae..d23f068 100644 --- a/app/integrations/telegram_satellite_service.py +++ b/app/integrations/telegram_satellite_service.py @@ -29,6 +29,9 @@ TELEGRAM_MESSAGE_SAFE_LIMIT = 3800 TELEGRAM_MAX_CONCURRENT_CHATS = 8 TELEGRAM_IDEMPOTENCY_BUCKET = "telegram_processed_messages" TELEGRAM_IDEMPOTENCY_CACHE_LIMIT = 100 +TELEGRAM_RUNTIME_BUCKET = "telegram_runtime_state" +TELEGRAM_RUNTIME_OWNER_ID = 0 +TELEGRAM_RUNTIME_CURSOR_TTL_DAYS = 30 def _split_telegram_text(text: str, limit: int = TELEGRAM_MESSAGE_SAFE_LIMIT) -> List[str]: @@ -244,6 +247,33 @@ class TelegramSatelliteService: }, ) + def _get_runtime_state(self) -> dict: + entry = self.state.get_entry(TELEGRAM_RUNTIME_BUCKET, TELEGRAM_RUNTIME_OWNER_ID) + return entry if isinstance(entry, dict) else {} + + def _persist_last_processed_update_id(self, update_id: int) -> None: + if update_id < 0: + return + + entry = self._get_runtime_state() + current_last_update_id = entry.get("last_update_id") + if isinstance(current_last_update_id, int) and current_last_update_id >= update_id: + self._last_update_id = max(self._last_update_id, current_last_update_id) + return + + now = utc_now().replace(microsecond=0) + expires_at = now + timedelta(days=TELEGRAM_RUNTIME_CURSOR_TTL_DAYS) + self.state.set_entry( + TELEGRAM_RUNTIME_BUCKET, + TELEGRAM_RUNTIME_OWNER_ID, + { + "last_update_id": update_id, + "updated_at": now, + "expires_at": expires_at, + }, + ) + self._last_update_id = max(self._last_update_id, update_id) + async def _schedule_update_processing( self, session: aiohttp.ClientSession, @@ -328,9 +358,16 @@ class TelegramSatelliteService: async def _initialize_offset(self, session: aiohttp.ClientSession) -> int | None: """ - Descarta backlog pendente no startup para evitar respostas repetidas apos restart. - Retorna o offset inicial seguro para o loop principal. + Retoma o polling a partir do ultimo update persistido. + Sem cursor salvo, faz um bootstrap conservador e registra o ponto inicial. """ + runtime_state = self._get_runtime_state() + last_update_id = runtime_state.get("last_update_id") + if isinstance(last_update_id, int) and last_update_id >= 0: + self._last_update_id = last_update_id + logger.info("Retomando polling do Telegram a partir do update_id persistido %s.", last_update_id) + return last_update_id + 1 + payload: Dict[str, Any] = { "timeout": 0, "limit": 100, @@ -350,8 +387,11 @@ class TelegramSatelliteService: if last_id < 0: return None - self._last_update_id = last_id - logger.info("Startup com backlog descartado: %s update(s) anteriores ignorados.", len(updates)) + self._persist_last_processed_update_id(last_id) + logger.info( + "Bootstrap inicial do Telegram sem cursor persistido: %s update(s) anteriores ignorados.", + len(updates), + ) return last_id + 1 async def _get_updates( @@ -421,6 +461,9 @@ class TelegramSatelliteService: answer = "Nao consegui processar sua solicitacao agora. Tente novamente em instantes." self._store_processed_update(update=update, answer=answer) + update_id = update.get("update_id") + if isinstance(update_id, int): + self._persist_last_processed_update_id(update_id) await self._send_message(session=session, chat_id=chat_id, text=answer) async def _send_message( @@ -449,6 +492,34 @@ class TelegramSatelliteService: image_attachments: List[Dict[str, Any]] | None = None, ) -> str: """Encaminha mensagem ao orquestrador com usuario identificado e retorna resposta.""" + message_text = text + if image_attachments: + image_message = await self._build_orchestration_message_from_image( + caption=text, + image_attachments=image_attachments, + ) + if self._is_image_analysis_failure_message(image_message): + return image_message + message_text = image_message + + return await asyncio.to_thread( + self._run_blocking_orchestration_turn, + message_text=message_text, + sender=sender, + chat_id=chat_id, + ) + + def _run_blocking_orchestration_turn( + self, + *, + message_text: str, + sender: Dict[str, Any], + chat_id: int, + ) -> str: + """ + Executa o turno do orquestrador fora do loop async principal. + Isso isola sessoes SQLAlchemy sincronas e outras operacoes bloqueantes. + """ tools_db = SessionLocal() mock_db = SessionMockLocal() try: @@ -466,18 +537,11 @@ class TelegramSatelliteService: username=username, ) - message_text = text - if image_attachments: - image_message = await self._build_orchestration_message_from_image( - caption=text, - image_attachments=image_attachments, - ) - if self._is_image_analysis_failure_message(image_message): - return image_message - message_text = image_message - - service = OrquestradorService(tools_db) - return await service.handle_message(message=message_text, user_id=user.id) + service = OrquestradorService( + tools_db, + state_repository=self.state, + ) + return asyncio.run(service.handle_message(message=message_text, user_id=user.id)) finally: tools_db.close() mock_db.close() diff --git a/app/services/orchestration/conversation_state_store.py b/app/services/orchestration/conversation_state_store.py index 73dea6b..3d8413b 100644 --- a/app/services/orchestration/conversation_state_store.py +++ b/app/services/orchestration/conversation_state_store.py @@ -1,6 +1,7 @@ from datetime import datetime, timedelta -from app.core.time_utils import utc_now +from threading import RLock +from app.core.time_utils import utc_now from app.services.orchestration.conversation_state_repository import ConversationStateRepository @@ -8,6 +9,7 @@ from app.services.orchestration.conversation_state_repository import Conversatio # Serve como fallback simples para desenvolvimento e testes. class ConversationStateStore(ConversationStateRepository): def __init__(self) -> None: + self._lock = RLock() self.user_contexts: dict[int, dict] = {} self.pending_review_confirmations: dict[int, dict] = {} self.pending_review_drafts: dict[int, dict] = {} @@ -20,68 +22,74 @@ class ConversationStateStore(ConversationStateRepository): self.pending_rental_drafts: dict[int, dict] = {} self.pending_rental_selections: dict[int, dict] = {} self.telegram_processed_messages: dict[int, dict] = {} + self.telegram_runtime_state: dict[int, dict] = {} def upsert_user_context(self, user_id: int | None, ttl_minutes: int) -> None: if user_id is None: return - now = utc_now() - context = self.user_contexts.get(user_id) - if context and context["expires_at"] >= now: - context["expires_at"] = now + timedelta(minutes=ttl_minutes) - return - self.user_contexts[user_id] = { - "active_domain": "general", - "active_task": None, - "generic_memory": {}, - "shared_memory": {}, - "collected_slots": {}, - "flow_snapshots": {}, - "last_tool_result": None, - "order_queue": [], - "pending_order_selection": None, - "pending_switch": None, - "last_stock_results": [], - "selected_vehicle": None, - "last_rental_results": [], - "selected_rental_vehicle": None, - "expires_at": now + timedelta(minutes=ttl_minutes), - } + with self._lock: + now = utc_now() + context = self.user_contexts.get(user_id) + if context and context["expires_at"] >= now: + context["expires_at"] = now + timedelta(minutes=ttl_minutes) + return + self.user_contexts[user_id] = { + "active_domain": "general", + "active_task": None, + "generic_memory": {}, + "shared_memory": {}, + "collected_slots": {}, + "flow_snapshots": {}, + "last_tool_result": None, + "order_queue": [], + "pending_order_selection": None, + "pending_switch": None, + "last_stock_results": [], + "selected_vehicle": None, + "last_rental_results": [], + "selected_rental_vehicle": None, + "expires_at": now + timedelta(minutes=ttl_minutes), + } def get_user_context(self, user_id: int | None) -> dict | None: if user_id is None: return None - context = self.user_contexts.get(user_id) - if not context: - return None - if context["expires_at"] < utc_now(): - self.user_contexts.pop(user_id, None) - return None - return context + with self._lock: + context = self.user_contexts.get(user_id) + if not context: + return None + if context["expires_at"] < utc_now(): + self.user_contexts.pop(user_id, None) + return None + return context def save_user_context(self, user_id: int | None, context: dict) -> None: if user_id is None or not isinstance(context, dict): return - self.user_contexts[user_id] = context + with self._lock: + self.user_contexts[user_id] = context def get_entry(self, bucket: str, user_id: int | None, *, expire: bool = False) -> dict | None: if user_id is None: return None - entries = getattr(self, bucket) - entry = entries.get(user_id) - if not entry: - return None - if expire and entry.get("expires_at") and entry["expires_at"] < utc_now(): - entries.pop(user_id, None) - return None - return entry + with self._lock: + entries = getattr(self, bucket) + entry = entries.get(user_id) + if not entry: + return None + if expire and entry.get("expires_at") and entry["expires_at"] < utc_now(): + entries.pop(user_id, None) + return None + return entry def set_entry(self, bucket: str, user_id: int | None, value: dict) -> None: if user_id is None: return - getattr(self, bucket)[user_id] = value + with self._lock: + getattr(self, bucket)[user_id] = value def pop_entry(self, bucket: str, user_id: int | None) -> dict | None: if user_id is None: return None - return getattr(self, bucket).pop(user_id, None) - + with self._lock: + return getattr(self, bucket).pop(user_id, None) diff --git a/tests/test_telegram_multimodal.py b/tests/test_telegram_multimodal.py index f45d27c..f5820d5 100644 --- a/tests/test_telegram_multimodal.py +++ b/tests/test_telegram_multimodal.py @@ -5,7 +5,11 @@ from unittest.mock import AsyncMock, patch from fastapi import HTTPException -from app.integrations.telegram_satellite_service import TelegramSatelliteService +from app.integrations.telegram_satellite_service import ( + TELEGRAM_RUNTIME_BUCKET, + TELEGRAM_RUNTIME_OWNER_ID, + TelegramSatelliteService, +) from app.services.orchestration.conversation_state_store import ConversationStateStore @@ -14,6 +18,30 @@ class _DummySession: return None +class _FakeTelegramResponse: + def __init__(self, payload): + self.payload = payload + + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc, tb): + return None + + async def json(self): + return self.payload + + +class _FakeTelegramSession: + def __init__(self, payload): + self.payload = payload + self.calls = [] + + def post(self, url, json): + self.calls.append((url, json)) + return _FakeTelegramResponse(self.payload) + + class TelegramMultimodalTests(unittest.IsolatedAsyncioTestCase): def _build_service(self) -> TelegramSatelliteService: service = TelegramSatelliteService( @@ -115,6 +143,26 @@ class TelegramMultimodalTests(unittest.IsolatedAsyncioTestCase): self.assertIn("marca d'agua SysaltiIA visivel", answer) self.assertFalse(orchestrator_cls.return_value.handle_message.await_count) + async def test_process_message_offloads_blocking_turn_to_worker_thread(self): + service = self._build_service() + + with patch( + "app.integrations.telegram_satellite_service.asyncio.to_thread", + AsyncMock(return_value="ok"), + ) as to_thread: + answer = await service._process_message( + text="quero ver a frota", + sender={"id": 99, "first_name": "Vitor"}, + chat_id=99, + image_attachments=[], + ) + + self.assertEqual(answer, "ok") + self.assertEqual(to_thread.await_count, 1) + self.assertEqual(to_thread.await_args.kwargs["message_text"], "quero ver a frota") + self.assertEqual(to_thread.await_args.kwargs["chat_id"], 99) + self.assertEqual(to_thread.await_args.kwargs["sender"]["id"], 99) + async def test_handle_update_masks_sensitive_domain_error_in_logs(self): service = self._build_service() update = { @@ -217,6 +265,71 @@ class TelegramMultimodalTests(unittest.IsolatedAsyncioTestCase): self.assertEqual(send_message.await_args_list[0].kwargs["text"], "Resposta 1") self.assertEqual(send_message.await_args_list[1].kwargs["text"], "Resposta 2") + async def test_initialize_offset_uses_persisted_cursor(self): + service = self._build_service() + service.state.set_entry( + TELEGRAM_RUNTIME_BUCKET, + TELEGRAM_RUNTIME_OWNER_ID, + {"last_update_id": 41}, + ) + + offset = await service._initialize_offset(session=SimpleNamespace()) + + self.assertEqual(offset, 42) + self.assertEqual(service._last_update_id, 41) + + async def test_initialize_offset_bootstraps_cursor_once_when_missing(self): + service = self._build_service() + session = _FakeTelegramSession( + { + "ok": True, + "result": [ + {"update_id": 5}, + {"update_id": 6}, + ], + } + ) + + offset = await service._initialize_offset(session=session) + + self.assertEqual(offset, 7) + self.assertEqual(service._last_update_id, 6) + entry = service.state.get_entry(TELEGRAM_RUNTIME_BUCKET, TELEGRAM_RUNTIME_OWNER_ID) + self.assertEqual(entry["last_update_id"], 6) + self.assertEqual(len(session.calls), 1) + + async def test_handle_update_persists_runtime_cursor(self): + service = self._build_service() + update = { + "update_id": 14, + "message": { + "message_id": 88, + "chat": {"id": 99}, + "from": {"id": 99}, + "text": "status do pedido", + }, + } + + with patch.object(service, "_extract_image_attachments", AsyncMock(return_value=[])), patch.object( + service, + "_process_message", + AsyncMock(return_value="Pedido encontrado."), + ), patch.object(service, "_send_message", AsyncMock()): + await service._handle_update(session=SimpleNamespace(), update=update) + + entry = service.state.get_entry(TELEGRAM_RUNTIME_BUCKET, TELEGRAM_RUNTIME_OWNER_ID) + self.assertEqual(entry["last_update_id"], 14) + + async def test_persist_last_processed_update_id_keeps_highest_seen_value(self): + service = self._build_service() + + service._persist_last_processed_update_id(11) + service._persist_last_processed_update_id(10) + + entry = service.state.get_entry(TELEGRAM_RUNTIME_BUCKET, TELEGRAM_RUNTIME_OWNER_ID) + self.assertEqual(entry["last_update_id"], 11) + self.assertEqual(service._last_update_id, 11) + async def test_schedule_update_processing_allows_parallel_chats(self): service = self._build_service() release_first_chat = asyncio.Event()