🧵 refactor(telegram): retomar updates e isolar io bloqueante

Persiste o ultimo update_id processado pelo satellite e retoma o polling a partir desse cursor no restart, usando a idempotencia por mensagem como rede de seguranca para reentregas.

Move o turno bloqueante do orquestrador para worker thread com asyncio.to_thread, mantendo no loop async apenas o que e realmente assincrono e endurecendo o backend de estado em memoria para acesso concorrente com RLock.

Amplia a cobertura do satellite com testes para cursor persistido, bootstrap inicial, atualizacao monotona do update_id e offload do processamento bloqueante, mantendo a suite completa verde com 218 testes.
main
parent 2ef283b170
commit 6f69094412

@ -29,6 +29,9 @@ TELEGRAM_MESSAGE_SAFE_LIMIT = 3800
TELEGRAM_MAX_CONCURRENT_CHATS = 8 TELEGRAM_MAX_CONCURRENT_CHATS = 8
TELEGRAM_IDEMPOTENCY_BUCKET = "telegram_processed_messages" TELEGRAM_IDEMPOTENCY_BUCKET = "telegram_processed_messages"
TELEGRAM_IDEMPOTENCY_CACHE_LIMIT = 100 TELEGRAM_IDEMPOTENCY_CACHE_LIMIT = 100
TELEGRAM_RUNTIME_BUCKET = "telegram_runtime_state"
TELEGRAM_RUNTIME_OWNER_ID = 0
TELEGRAM_RUNTIME_CURSOR_TTL_DAYS = 30
def _split_telegram_text(text: str, limit: int = TELEGRAM_MESSAGE_SAFE_LIMIT) -> List[str]: def _split_telegram_text(text: str, limit: int = TELEGRAM_MESSAGE_SAFE_LIMIT) -> List[str]:
@ -244,6 +247,33 @@ class TelegramSatelliteService:
}, },
) )
def _get_runtime_state(self) -> dict:
entry = self.state.get_entry(TELEGRAM_RUNTIME_BUCKET, TELEGRAM_RUNTIME_OWNER_ID)
return entry if isinstance(entry, dict) else {}
def _persist_last_processed_update_id(self, update_id: int) -> None:
if update_id < 0:
return
entry = self._get_runtime_state()
current_last_update_id = entry.get("last_update_id")
if isinstance(current_last_update_id, int) and current_last_update_id >= update_id:
self._last_update_id = max(self._last_update_id, current_last_update_id)
return
now = utc_now().replace(microsecond=0)
expires_at = now + timedelta(days=TELEGRAM_RUNTIME_CURSOR_TTL_DAYS)
self.state.set_entry(
TELEGRAM_RUNTIME_BUCKET,
TELEGRAM_RUNTIME_OWNER_ID,
{
"last_update_id": update_id,
"updated_at": now,
"expires_at": expires_at,
},
)
self._last_update_id = max(self._last_update_id, update_id)
async def _schedule_update_processing( async def _schedule_update_processing(
self, self,
session: aiohttp.ClientSession, session: aiohttp.ClientSession,
@ -328,9 +358,16 @@ class TelegramSatelliteService:
async def _initialize_offset(self, session: aiohttp.ClientSession) -> int | None: async def _initialize_offset(self, session: aiohttp.ClientSession) -> int | None:
""" """
Descarta backlog pendente no startup para evitar respostas repetidas apos restart. Retoma o polling a partir do ultimo update persistido.
Retorna o offset inicial seguro para o loop principal. Sem cursor salvo, faz um bootstrap conservador e registra o ponto inicial.
""" """
runtime_state = self._get_runtime_state()
last_update_id = runtime_state.get("last_update_id")
if isinstance(last_update_id, int) and last_update_id >= 0:
self._last_update_id = last_update_id
logger.info("Retomando polling do Telegram a partir do update_id persistido %s.", last_update_id)
return last_update_id + 1
payload: Dict[str, Any] = { payload: Dict[str, Any] = {
"timeout": 0, "timeout": 0,
"limit": 100, "limit": 100,
@ -350,8 +387,11 @@ class TelegramSatelliteService:
if last_id < 0: if last_id < 0:
return None return None
self._last_update_id = last_id self._persist_last_processed_update_id(last_id)
logger.info("Startup com backlog descartado: %s update(s) anteriores ignorados.", len(updates)) logger.info(
"Bootstrap inicial do Telegram sem cursor persistido: %s update(s) anteriores ignorados.",
len(updates),
)
return last_id + 1 return last_id + 1
async def _get_updates( async def _get_updates(
@ -421,6 +461,9 @@ class TelegramSatelliteService:
answer = "Nao consegui processar sua solicitacao agora. Tente novamente em instantes." answer = "Nao consegui processar sua solicitacao agora. Tente novamente em instantes."
self._store_processed_update(update=update, answer=answer) self._store_processed_update(update=update, answer=answer)
update_id = update.get("update_id")
if isinstance(update_id, int):
self._persist_last_processed_update_id(update_id)
await self._send_message(session=session, chat_id=chat_id, text=answer) await self._send_message(session=session, chat_id=chat_id, text=answer)
async def _send_message( async def _send_message(
@ -449,6 +492,34 @@ class TelegramSatelliteService:
image_attachments: List[Dict[str, Any]] | None = None, image_attachments: List[Dict[str, Any]] | None = None,
) -> str: ) -> str:
"""Encaminha mensagem ao orquestrador com usuario identificado e retorna resposta.""" """Encaminha mensagem ao orquestrador com usuario identificado e retorna resposta."""
message_text = text
if image_attachments:
image_message = await self._build_orchestration_message_from_image(
caption=text,
image_attachments=image_attachments,
)
if self._is_image_analysis_failure_message(image_message):
return image_message
message_text = image_message
return await asyncio.to_thread(
self._run_blocking_orchestration_turn,
message_text=message_text,
sender=sender,
chat_id=chat_id,
)
def _run_blocking_orchestration_turn(
self,
*,
message_text: str,
sender: Dict[str, Any],
chat_id: int,
) -> str:
"""
Executa o turno do orquestrador fora do loop async principal.
Isso isola sessoes SQLAlchemy sincronas e outras operacoes bloqueantes.
"""
tools_db = SessionLocal() tools_db = SessionLocal()
mock_db = SessionMockLocal() mock_db = SessionMockLocal()
try: try:
@ -466,18 +537,11 @@ class TelegramSatelliteService:
username=username, username=username,
) )
message_text = text service = OrquestradorService(
if image_attachments: tools_db,
image_message = await self._build_orchestration_message_from_image( state_repository=self.state,
caption=text, )
image_attachments=image_attachments, return asyncio.run(service.handle_message(message=message_text, user_id=user.id))
)
if self._is_image_analysis_failure_message(image_message):
return image_message
message_text = image_message
service = OrquestradorService(tools_db)
return await service.handle_message(message=message_text, user_id=user.id)
finally: finally:
tools_db.close() tools_db.close()
mock_db.close() mock_db.close()

@ -1,6 +1,7 @@
from datetime import datetime, timedelta from datetime import datetime, timedelta
from app.core.time_utils import utc_now from threading import RLock
from app.core.time_utils import utc_now
from app.services.orchestration.conversation_state_repository import ConversationStateRepository from app.services.orchestration.conversation_state_repository import ConversationStateRepository
@ -8,6 +9,7 @@ from app.services.orchestration.conversation_state_repository import Conversatio
# Serve como fallback simples para desenvolvimento e testes. # Serve como fallback simples para desenvolvimento e testes.
class ConversationStateStore(ConversationStateRepository): class ConversationStateStore(ConversationStateRepository):
def __init__(self) -> None: def __init__(self) -> None:
self._lock = RLock()
self.user_contexts: dict[int, dict] = {} self.user_contexts: dict[int, dict] = {}
self.pending_review_confirmations: dict[int, dict] = {} self.pending_review_confirmations: dict[int, dict] = {}
self.pending_review_drafts: dict[int, dict] = {} self.pending_review_drafts: dict[int, dict] = {}
@ -20,68 +22,74 @@ class ConversationStateStore(ConversationStateRepository):
self.pending_rental_drafts: dict[int, dict] = {} self.pending_rental_drafts: dict[int, dict] = {}
self.pending_rental_selections: dict[int, dict] = {} self.pending_rental_selections: dict[int, dict] = {}
self.telegram_processed_messages: dict[int, dict] = {} self.telegram_processed_messages: dict[int, dict] = {}
self.telegram_runtime_state: dict[int, dict] = {}
def upsert_user_context(self, user_id: int | None, ttl_minutes: int) -> None: def upsert_user_context(self, user_id: int | None, ttl_minutes: int) -> None:
if user_id is None: if user_id is None:
return return
now = utc_now() with self._lock:
context = self.user_contexts.get(user_id) now = utc_now()
if context and context["expires_at"] >= now: context = self.user_contexts.get(user_id)
context["expires_at"] = now + timedelta(minutes=ttl_minutes) if context and context["expires_at"] >= now:
return context["expires_at"] = now + timedelta(minutes=ttl_minutes)
self.user_contexts[user_id] = { return
"active_domain": "general", self.user_contexts[user_id] = {
"active_task": None, "active_domain": "general",
"generic_memory": {}, "active_task": None,
"shared_memory": {}, "generic_memory": {},
"collected_slots": {}, "shared_memory": {},
"flow_snapshots": {}, "collected_slots": {},
"last_tool_result": None, "flow_snapshots": {},
"order_queue": [], "last_tool_result": None,
"pending_order_selection": None, "order_queue": [],
"pending_switch": None, "pending_order_selection": None,
"last_stock_results": [], "pending_switch": None,
"selected_vehicle": None, "last_stock_results": [],
"last_rental_results": [], "selected_vehicle": None,
"selected_rental_vehicle": None, "last_rental_results": [],
"expires_at": now + timedelta(minutes=ttl_minutes), "selected_rental_vehicle": None,
} "expires_at": now + timedelta(minutes=ttl_minutes),
}
def get_user_context(self, user_id: int | None) -> dict | None: def get_user_context(self, user_id: int | None) -> dict | None:
if user_id is None: if user_id is None:
return None return None
context = self.user_contexts.get(user_id) with self._lock:
if not context: context = self.user_contexts.get(user_id)
return None if not context:
if context["expires_at"] < utc_now(): return None
self.user_contexts.pop(user_id, None) if context["expires_at"] < utc_now():
return None self.user_contexts.pop(user_id, None)
return context return None
return context
def save_user_context(self, user_id: int | None, context: dict) -> None: def save_user_context(self, user_id: int | None, context: dict) -> None:
if user_id is None or not isinstance(context, dict): if user_id is None or not isinstance(context, dict):
return return
self.user_contexts[user_id] = context with self._lock:
self.user_contexts[user_id] = context
def get_entry(self, bucket: str, user_id: int | None, *, expire: bool = False) -> dict | None: def get_entry(self, bucket: str, user_id: int | None, *, expire: bool = False) -> dict | None:
if user_id is None: if user_id is None:
return None return None
entries = getattr(self, bucket) with self._lock:
entry = entries.get(user_id) entries = getattr(self, bucket)
if not entry: entry = entries.get(user_id)
return None if not entry:
if expire and entry.get("expires_at") and entry["expires_at"] < utc_now(): return None
entries.pop(user_id, None) if expire and entry.get("expires_at") and entry["expires_at"] < utc_now():
return None entries.pop(user_id, None)
return entry return None
return entry
def set_entry(self, bucket: str, user_id: int | None, value: dict) -> None: def set_entry(self, bucket: str, user_id: int | None, value: dict) -> None:
if user_id is None: if user_id is None:
return return
getattr(self, bucket)[user_id] = value with self._lock:
getattr(self, bucket)[user_id] = value
def pop_entry(self, bucket: str, user_id: int | None) -> dict | None: def pop_entry(self, bucket: str, user_id: int | None) -> dict | None:
if user_id is None: if user_id is None:
return None return None
return getattr(self, bucket).pop(user_id, None) with self._lock:
return getattr(self, bucket).pop(user_id, None)

@ -5,7 +5,11 @@ from unittest.mock import AsyncMock, patch
from fastapi import HTTPException from fastapi import HTTPException
from app.integrations.telegram_satellite_service import TelegramSatelliteService from app.integrations.telegram_satellite_service import (
TELEGRAM_RUNTIME_BUCKET,
TELEGRAM_RUNTIME_OWNER_ID,
TelegramSatelliteService,
)
from app.services.orchestration.conversation_state_store import ConversationStateStore from app.services.orchestration.conversation_state_store import ConversationStateStore
@ -14,6 +18,30 @@ class _DummySession:
return None return None
class _FakeTelegramResponse:
def __init__(self, payload):
self.payload = payload
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc, tb):
return None
async def json(self):
return self.payload
class _FakeTelegramSession:
def __init__(self, payload):
self.payload = payload
self.calls = []
def post(self, url, json):
self.calls.append((url, json))
return _FakeTelegramResponse(self.payload)
class TelegramMultimodalTests(unittest.IsolatedAsyncioTestCase): class TelegramMultimodalTests(unittest.IsolatedAsyncioTestCase):
def _build_service(self) -> TelegramSatelliteService: def _build_service(self) -> TelegramSatelliteService:
service = TelegramSatelliteService( service = TelegramSatelliteService(
@ -115,6 +143,26 @@ class TelegramMultimodalTests(unittest.IsolatedAsyncioTestCase):
self.assertIn("marca d'agua SysaltiIA visivel", answer) self.assertIn("marca d'agua SysaltiIA visivel", answer)
self.assertFalse(orchestrator_cls.return_value.handle_message.await_count) self.assertFalse(orchestrator_cls.return_value.handle_message.await_count)
async def test_process_message_offloads_blocking_turn_to_worker_thread(self):
service = self._build_service()
with patch(
"app.integrations.telegram_satellite_service.asyncio.to_thread",
AsyncMock(return_value="ok"),
) as to_thread:
answer = await service._process_message(
text="quero ver a frota",
sender={"id": 99, "first_name": "Vitor"},
chat_id=99,
image_attachments=[],
)
self.assertEqual(answer, "ok")
self.assertEqual(to_thread.await_count, 1)
self.assertEqual(to_thread.await_args.kwargs["message_text"], "quero ver a frota")
self.assertEqual(to_thread.await_args.kwargs["chat_id"], 99)
self.assertEqual(to_thread.await_args.kwargs["sender"]["id"], 99)
async def test_handle_update_masks_sensitive_domain_error_in_logs(self): async def test_handle_update_masks_sensitive_domain_error_in_logs(self):
service = self._build_service() service = self._build_service()
update = { update = {
@ -217,6 +265,71 @@ class TelegramMultimodalTests(unittest.IsolatedAsyncioTestCase):
self.assertEqual(send_message.await_args_list[0].kwargs["text"], "Resposta 1") self.assertEqual(send_message.await_args_list[0].kwargs["text"], "Resposta 1")
self.assertEqual(send_message.await_args_list[1].kwargs["text"], "Resposta 2") self.assertEqual(send_message.await_args_list[1].kwargs["text"], "Resposta 2")
async def test_initialize_offset_uses_persisted_cursor(self):
service = self._build_service()
service.state.set_entry(
TELEGRAM_RUNTIME_BUCKET,
TELEGRAM_RUNTIME_OWNER_ID,
{"last_update_id": 41},
)
offset = await service._initialize_offset(session=SimpleNamespace())
self.assertEqual(offset, 42)
self.assertEqual(service._last_update_id, 41)
async def test_initialize_offset_bootstraps_cursor_once_when_missing(self):
service = self._build_service()
session = _FakeTelegramSession(
{
"ok": True,
"result": [
{"update_id": 5},
{"update_id": 6},
],
}
)
offset = await service._initialize_offset(session=session)
self.assertEqual(offset, 7)
self.assertEqual(service._last_update_id, 6)
entry = service.state.get_entry(TELEGRAM_RUNTIME_BUCKET, TELEGRAM_RUNTIME_OWNER_ID)
self.assertEqual(entry["last_update_id"], 6)
self.assertEqual(len(session.calls), 1)
async def test_handle_update_persists_runtime_cursor(self):
service = self._build_service()
update = {
"update_id": 14,
"message": {
"message_id": 88,
"chat": {"id": 99},
"from": {"id": 99},
"text": "status do pedido",
},
}
with patch.object(service, "_extract_image_attachments", AsyncMock(return_value=[])), patch.object(
service,
"_process_message",
AsyncMock(return_value="Pedido encontrado."),
), patch.object(service, "_send_message", AsyncMock()):
await service._handle_update(session=SimpleNamespace(), update=update)
entry = service.state.get_entry(TELEGRAM_RUNTIME_BUCKET, TELEGRAM_RUNTIME_OWNER_ID)
self.assertEqual(entry["last_update_id"], 14)
async def test_persist_last_processed_update_id_keeps_highest_seen_value(self):
service = self._build_service()
service._persist_last_processed_update_id(11)
service._persist_last_processed_update_id(10)
entry = service.state.get_entry(TELEGRAM_RUNTIME_BUCKET, TELEGRAM_RUNTIME_OWNER_ID)
self.assertEqual(entry["last_update_id"], 11)
self.assertEqual(service._last_update_id, 11)
async def test_schedule_update_processing_allows_parallel_chats(self): async def test_schedule_update_processing_allows_parallel_chats(self):
service = self._build_service() service = self._build_service()
release_first_chat = asyncio.Event() release_first_chat = asyncio.Event()

Loading…
Cancel
Save