import asyncio import logging import os import tempfile from typing import Any, Dict, List import aiohttp from fastapi import HTTPException from app.core.settings import settings from app.db.database import SessionLocal from app.db.mock_database import SessionMockLocal from app.services.ai.llm_service import LLMService from app.services.orchestration.orquestrador_service import OrquestradorService from app.services.user.user_service import UserService logger = logging.getLogger(__name__) def _ensure_supported_runtime_configuration() -> None: """ Em producao, o satelite nao deve operar com estado conversacional apenas em memoria, porque isso quebra continuidade entre reinicios e instancias. """ if settings.environment == "production" and settings.conversation_state_backend == "memory": raise RuntimeError( "Telegram satellite em producao exige conversation_state_backend diferente de memory." ) def _acquire_single_instance_lock(lock_name: str): """ Garante apenas uma instancia local do satelite por lock de arquivo. Retorna o handle do arquivo para manter o lock vivo. """ lock_path = os.path.join(tempfile.gettempdir(), lock_name) handle = open(lock_path, "a+") try: import msvcrt try: msvcrt.locking(handle.fileno(), msvcrt.LK_NBLCK, 1) except OSError: handle.close() raise RuntimeError( "Ja existe uma instancia do Telegram satellite em execucao neste host." ) return handle except ImportError: import fcntl try: fcntl.flock(handle.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB) except OSError: handle.close() raise RuntimeError( "Ja existe uma instancia do Telegram satellite em execucao neste host." ) return handle class TelegramSatelliteService: """ Interface satelite para Telegram. Processa mensagens direto no OrquestradorService e publica respostas no chat. """ def __init__(self, token: str): """Configura cliente Telegram com URL base e timeouts padrao.""" self.base_url = f"https://api.telegram.org/bot{token}" self.polling_timeout = settings.telegram_polling_timeout self.request_timeout = settings.telegram_request_timeout self._last_update_id = -1 async def run(self) -> None: """Inicia loop de long polling para consumir atualizacoes do bot.""" logger.info("Telegram satellite iniciado com long polling.") await self._warmup_llm() offset = None timeout = aiohttp.ClientTimeout(total=self.request_timeout) async with aiohttp.ClientSession(timeout=timeout) as session: offset = await self._initialize_offset(session=session) while True: updates = await self._get_updates(session=session, offset=offset) for update in updates: update_id = update.get("update_id") if not isinstance(update_id, int): continue if update_id <= self._last_update_id: continue self._last_update_id = update_id offset = update_id + 1 await self._handle_update(session=session, update=update) async def _warmup_llm(self) -> None: """Preaquece o LLM no startup do satelite para reduzir latencia do primeiro usuario.""" try: await LLMService().warmup() logger.info("Warmup de LLM concluido no Telegram satellite.") except Exception: logger.exception("Falha no warmup de LLM do Telegram satellite.") async def _initialize_offset(self, session: aiohttp.ClientSession) -> int | None: """ Descarta backlog pendente no startup para evitar respostas repetidas apos restart. Retorna o offset inicial seguro para o loop principal. """ payload: Dict[str, Any] = { "timeout": 0, "limit": 100, "allowed_updates": ["message"], } async with session.post(f"{self.base_url}/getUpdates", json=payload) as response: data = await response.json() if not data.get("ok"): logger.warning("Falha ao inicializar offset no Telegram: %s", data) return None updates = data.get("result", []) if not updates: return None last_id = max(u.get("update_id", -1) for u in updates if isinstance(u.get("update_id"), int)) if last_id < 0: return None self._last_update_id = last_id logger.info("Startup com backlog descartado: %s update(s) anteriores ignorados.", len(updates)) return last_id + 1 async def _get_updates( self, session: aiohttp.ClientSession, offset: int | None, ) -> List[Dict[str, Any]]: """Busca novas mensagens no Telegram a partir do offset informado.""" payload: Dict[str, Any] = { "timeout": self.polling_timeout, "limit": 100, "allowed_updates": ["message"], } if offset is not None: payload["offset"] = offset async with session.post(f"{self.base_url}/getUpdates", json=payload) as response: data = await response.json() if not data.get("ok"): logger.warning("Falha em getUpdates: %s", data) return [] return data.get("result", []) async def _handle_update( self, session: aiohttp.ClientSession, update: Dict[str, Any], ) -> None: """Processa uma atualizacao recebida e envia resposta ao chat.""" message = update.get("message", {}) text = message.get("text") chat = message.get("chat", {}) chat_id = chat.get("id") sender = message.get("from", {}) if not text or not chat_id: return try: answer = await self._process_message(text=text, sender=sender, chat_id=chat_id) except HTTPException as exc: logger.warning("Falha de dominio ao processar mensagem no Telegram: %s", exc.detail) answer = str(exc.detail) if exc.detail else "Nao foi possivel concluir a operacao solicitada." except Exception: logger.exception("Erro ao processar mensagem do Telegram.") answer = "Nao consegui processar sua solicitacao agora. Tente novamente em instantes." await self._send_message(session=session, chat_id=chat_id, text=answer) async def _send_message( self, session: aiohttp.ClientSession, chat_id: int, text: str, ) -> None: """Envia mensagem de texto para o chat informado no Telegram.""" payload = { "chat_id": chat_id, "text": text, } async with session.post(f"{self.base_url}/sendMessage", json=payload) as response: data = await response.json() if not data.get("ok"): logger.warning("Falha em sendMessage: %s", data) async def _process_message(self, text: str, sender: Dict[str, Any], chat_id: int) -> str: """Encaminha mensagem ao orquestrador com usuario identificado e retorna resposta.""" tools_db = SessionLocal() mock_db = SessionMockLocal() try: user_service = UserService(mock_db) external_id = str(sender.get("id") or chat_id) first_name = (sender.get("first_name") or "").strip() last_name = (sender.get("last_name") or "").strip() display_name = f"{first_name} {last_name}".strip() or None username = sender.get("username") user = user_service.get_or_create( channel="telegram", external_id=external_id, name=display_name, username=username, ) service = OrquestradorService(tools_db) return await service.handle_message(message=text, user_id=user.id) finally: tools_db.close() mock_db.close() async def main() -> None: """Inicializa servico satelite do Telegram e inicia processamento continuo.""" token = settings.telegram_bot_token if not token: raise RuntimeError("TELEGRAM_BOT_TOKEN nao configurado.") _ensure_supported_runtime_configuration() lock_handle = _acquire_single_instance_lock("orquestrador_telegram_satellite.lock") logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s %(message)s") service = TelegramSatelliteService(token=token) try: await service.run() finally: lock_handle.close() if __name__ == "__main__": asyncio.run(main())