You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
242 lines
8.8 KiB
Python
242 lines
8.8 KiB
Python
import asyncio
|
|
import logging
|
|
import os
|
|
import tempfile
|
|
from typing import Any, Dict, List
|
|
|
|
import aiohttp
|
|
from fastapi import HTTPException
|
|
|
|
from app.core.settings import settings
|
|
from app.db.database import SessionLocal
|
|
from app.db.mock_database import SessionMockLocal
|
|
from app.services.ai.llm_service import LLMService
|
|
from app.services.orchestration.orquestrador_service import OrquestradorService
|
|
from app.services.user.user_service import UserService
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def _ensure_supported_runtime_configuration() -> None:
|
|
"""
|
|
Em producao, o satelite nao deve operar com estado conversacional apenas em memoria,
|
|
porque isso quebra continuidade entre reinicios e instancias.
|
|
"""
|
|
if settings.environment == "production" and settings.conversation_state_backend == "memory":
|
|
raise RuntimeError(
|
|
"Telegram satellite em producao exige conversation_state_backend diferente de memory."
|
|
)
|
|
|
|
|
|
def _acquire_single_instance_lock(lock_name: str):
|
|
"""
|
|
Garante apenas uma instancia local do satelite por lock de arquivo.
|
|
Retorna o handle do arquivo para manter o lock vivo.
|
|
"""
|
|
lock_path = os.path.join(tempfile.gettempdir(), lock_name)
|
|
handle = open(lock_path, "a+")
|
|
|
|
try:
|
|
import msvcrt
|
|
|
|
try:
|
|
msvcrt.locking(handle.fileno(), msvcrt.LK_NBLCK, 1)
|
|
except OSError:
|
|
handle.close()
|
|
raise RuntimeError(
|
|
"Ja existe uma instancia do Telegram satellite em execucao neste host."
|
|
)
|
|
return handle
|
|
except ImportError:
|
|
import fcntl
|
|
|
|
try:
|
|
fcntl.flock(handle.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
|
|
except OSError:
|
|
handle.close()
|
|
raise RuntimeError(
|
|
"Ja existe uma instancia do Telegram satellite em execucao neste host."
|
|
)
|
|
return handle
|
|
|
|
|
|
class TelegramSatelliteService:
|
|
"""
|
|
Interface satelite para Telegram.
|
|
Processa mensagens direto no OrquestradorService e publica respostas no chat.
|
|
"""
|
|
|
|
def __init__(self, token: str):
|
|
"""Configura cliente Telegram com URL base e timeouts padrao."""
|
|
self.base_url = f"https://api.telegram.org/bot{token}"
|
|
self.polling_timeout = settings.telegram_polling_timeout
|
|
self.request_timeout = settings.telegram_request_timeout
|
|
self._last_update_id = -1
|
|
|
|
async def run(self) -> None:
|
|
"""Inicia loop de long polling para consumir atualizacoes do bot."""
|
|
logger.info("Telegram satellite iniciado com long polling.")
|
|
await self._warmup_llm()
|
|
offset = None
|
|
timeout = aiohttp.ClientTimeout(total=self.request_timeout)
|
|
|
|
async with aiohttp.ClientSession(timeout=timeout) as session:
|
|
offset = await self._initialize_offset(session=session)
|
|
while True:
|
|
updates = await self._get_updates(session=session, offset=offset)
|
|
for update in updates:
|
|
update_id = update.get("update_id")
|
|
if not isinstance(update_id, int):
|
|
continue
|
|
if update_id <= self._last_update_id:
|
|
continue
|
|
self._last_update_id = update_id
|
|
offset = update_id + 1
|
|
await self._handle_update(session=session, update=update)
|
|
|
|
async def _warmup_llm(self) -> None:
|
|
"""Preaquece o LLM no startup do satelite para reduzir latencia do primeiro usuario."""
|
|
try:
|
|
await LLMService().warmup()
|
|
logger.info("Warmup de LLM concluido no Telegram satellite.")
|
|
except Exception:
|
|
logger.exception("Falha no warmup de LLM do Telegram satellite.")
|
|
|
|
async def _initialize_offset(self, session: aiohttp.ClientSession) -> int | None:
|
|
"""
|
|
Descarta backlog pendente no startup para evitar respostas repetidas apos restart.
|
|
Retorna o offset inicial seguro para o loop principal.
|
|
"""
|
|
payload: Dict[str, Any] = {
|
|
"timeout": 0,
|
|
"limit": 100,
|
|
"allowed_updates": ["message"],
|
|
}
|
|
async with session.post(f"{self.base_url}/getUpdates", json=payload) as response:
|
|
data = await response.json()
|
|
if not data.get("ok"):
|
|
logger.warning("Falha ao inicializar offset no Telegram: %s", data)
|
|
return None
|
|
|
|
updates = data.get("result", [])
|
|
if not updates:
|
|
return None
|
|
|
|
last_id = max(u.get("update_id", -1) for u in updates if isinstance(u.get("update_id"), int))
|
|
if last_id < 0:
|
|
return None
|
|
|
|
self._last_update_id = last_id
|
|
logger.info("Startup com backlog descartado: %s update(s) anteriores ignorados.", len(updates))
|
|
return last_id + 1
|
|
|
|
async def _get_updates(
|
|
self,
|
|
session: aiohttp.ClientSession,
|
|
offset: int | None,
|
|
) -> List[Dict[str, Any]]:
|
|
"""Busca novas mensagens no Telegram a partir do offset informado."""
|
|
payload: Dict[str, Any] = {
|
|
"timeout": self.polling_timeout,
|
|
"limit": 100,
|
|
"allowed_updates": ["message"],
|
|
}
|
|
if offset is not None:
|
|
payload["offset"] = offset
|
|
|
|
async with session.post(f"{self.base_url}/getUpdates", json=payload) as response:
|
|
data = await response.json()
|
|
if not data.get("ok"):
|
|
logger.warning("Falha em getUpdates: %s", data)
|
|
return []
|
|
return data.get("result", [])
|
|
|
|
async def _handle_update(
|
|
self,
|
|
session: aiohttp.ClientSession,
|
|
update: Dict[str, Any],
|
|
) -> None:
|
|
"""Processa uma atualizacao recebida e envia resposta ao chat."""
|
|
message = update.get("message", {})
|
|
text = message.get("text")
|
|
chat = message.get("chat", {})
|
|
chat_id = chat.get("id")
|
|
sender = message.get("from", {})
|
|
|
|
if not text or not chat_id:
|
|
return
|
|
|
|
try:
|
|
answer = await self._process_message(text=text, sender=sender, chat_id=chat_id)
|
|
except HTTPException as exc:
|
|
logger.warning("Falha de dominio ao processar mensagem no Telegram: %s", exc.detail)
|
|
answer = str(exc.detail) if exc.detail else "Nao foi possivel concluir a operacao solicitada."
|
|
except Exception:
|
|
logger.exception("Erro ao processar mensagem do Telegram.")
|
|
answer = "Nao consegui processar sua solicitacao agora. Tente novamente em instantes."
|
|
|
|
await self._send_message(session=session, chat_id=chat_id, text=answer)
|
|
|
|
async def _send_message(
|
|
self,
|
|
session: aiohttp.ClientSession,
|
|
chat_id: int,
|
|
text: str,
|
|
) -> None:
|
|
"""Envia mensagem de texto para o chat informado no Telegram."""
|
|
payload = {
|
|
"chat_id": chat_id,
|
|
"text": text,
|
|
}
|
|
async with session.post(f"{self.base_url}/sendMessage", json=payload) as response:
|
|
data = await response.json()
|
|
if not data.get("ok"):
|
|
logger.warning("Falha em sendMessage: %s", data)
|
|
|
|
async def _process_message(self, text: str, sender: Dict[str, Any], chat_id: int) -> str:
|
|
"""Encaminha mensagem ao orquestrador com usuario identificado e retorna resposta."""
|
|
tools_db = SessionLocal()
|
|
mock_db = SessionMockLocal()
|
|
try:
|
|
user_service = UserService(mock_db)
|
|
external_id = str(sender.get("id") or chat_id)
|
|
first_name = (sender.get("first_name") or "").strip()
|
|
last_name = (sender.get("last_name") or "").strip()
|
|
display_name = f"{first_name} {last_name}".strip() or None
|
|
username = sender.get("username")
|
|
|
|
user = user_service.get_or_create(
|
|
channel="telegram",
|
|
external_id=external_id,
|
|
name=display_name,
|
|
username=username,
|
|
)
|
|
|
|
service = OrquestradorService(tools_db)
|
|
return await service.handle_message(message=text, user_id=user.id)
|
|
finally:
|
|
tools_db.close()
|
|
mock_db.close()
|
|
|
|
|
|
async def main() -> None:
|
|
"""Inicializa servico satelite do Telegram e inicia processamento continuo."""
|
|
token = settings.telegram_bot_token
|
|
if not token:
|
|
raise RuntimeError("TELEGRAM_BOT_TOKEN nao configurado.")
|
|
_ensure_supported_runtime_configuration()
|
|
|
|
lock_handle = _acquire_single_instance_lock("orquestrador_telegram_satellite.lock")
|
|
|
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s %(message)s")
|
|
service = TelegramSatelliteService(token=token)
|
|
try:
|
|
await service.run()
|
|
finally:
|
|
lock_handle.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|