You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
orquestrador/app/integrations/telegram_satellite_service.py

296 lines
10 KiB
Python

import asyncio
import logging
import os
import tempfile
from typing import Any, Dict, List
import aiohttp
from fastapi import HTTPException
from app.core.settings import settings
from app.db.database import SessionLocal
from app.db.mock_database import SessionMockLocal
from app.services.ai.llm_service import LLMService
from app.services.orchestration.orquestrador_service import OrquestradorService
from app.services.user.user_service import UserService
logger = logging.getLogger(__name__)
TELEGRAM_MESSAGE_SAFE_LIMIT = 3800
def _split_telegram_text(text: str, limit: int = TELEGRAM_MESSAGE_SAFE_LIMIT) -> List[str]:
normalized = str(text or "").strip()
if not normalized:
return [""]
if len(normalized) <= limit:
return [normalized]
chunks: List[str] = []
current = ""
def flush_current() -> None:
nonlocal current
if current:
chunks.append(current)
current = ""
paragraphs = normalized.split("\n\n")
for paragraph in paragraphs:
candidate = paragraph if not current else f"{current}\n\n{paragraph}"
if len(candidate) <= limit:
current = candidate
continue
flush_current()
if len(paragraph) <= limit:
current = paragraph
continue
line_buffer = ""
for line in paragraph.split("\n"):
line_candidate = line if not line_buffer else f"{line_buffer}\n{line}"
if len(line_candidate) <= limit:
line_buffer = line_candidate
continue
if line_buffer:
chunks.append(line_buffer)
line_buffer = ""
while len(line) > limit:
chunks.append(line[:limit])
line = line[limit:]
line_buffer = line
if line_buffer:
current = line_buffer
flush_current()
return chunks or [normalized[:limit]]
def _ensure_supported_runtime_configuration() -> None:
"""
Em producao, o satelite nao deve operar com estado conversacional apenas em memoria,
porque isso quebra continuidade entre reinicios e instancias.
"""
if settings.environment == "production" and settings.conversation_state_backend == "memory":
raise RuntimeError(
"Telegram satellite em producao exige conversation_state_backend diferente de memory."
)
def _acquire_single_instance_lock(lock_name: str):
"""
Garante apenas uma instancia local do satelite por lock de arquivo.
Retorna o handle do arquivo para manter o lock vivo.
"""
lock_path = os.path.join(tempfile.gettempdir(), lock_name)
handle = open(lock_path, "a+")
try:
import msvcrt
try:
msvcrt.locking(handle.fileno(), msvcrt.LK_NBLCK, 1)
except OSError:
handle.close()
raise RuntimeError(
"Ja existe uma instancia do Telegram satellite em execucao neste host."
)
return handle
except ImportError:
import fcntl
try:
fcntl.flock(handle.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
except OSError:
handle.close()
raise RuntimeError(
"Ja existe uma instancia do Telegram satellite em execucao neste host."
)
return handle
class TelegramSatelliteService:
"""
Interface satelite para Telegram.
Processa mensagens direto no OrquestradorService e publica respostas no chat.
"""
def __init__(self, token: str):
"""Configura cliente Telegram com URL base e timeouts padrao."""
self.base_url = f"https://api.telegram.org/bot{token}"
self.polling_timeout = settings.telegram_polling_timeout
self.request_timeout = settings.telegram_request_timeout
self._last_update_id = -1
async def run(self) -> None:
"""Inicia loop de long polling para consumir atualizacoes do bot."""
logger.info("Telegram satellite iniciado com long polling.")
await self._warmup_llm()
offset = None
timeout = aiohttp.ClientTimeout(total=self.request_timeout)
async with aiohttp.ClientSession(timeout=timeout) as session:
offset = await self._initialize_offset(session=session)
while True:
updates = await self._get_updates(session=session, offset=offset)
for update in updates:
update_id = update.get("update_id")
if not isinstance(update_id, int):
continue
if update_id <= self._last_update_id:
continue
self._last_update_id = update_id
offset = update_id + 1
await self._handle_update(session=session, update=update)
async def _warmup_llm(self) -> None:
"""Preaquece o LLM no startup do satelite para reduzir latencia do primeiro usuario."""
try:
await LLMService().warmup()
logger.info("Warmup de LLM concluido no Telegram satellite.")
except Exception:
logger.exception("Falha no warmup de LLM do Telegram satellite.")
async def _initialize_offset(self, session: aiohttp.ClientSession) -> int | None:
"""
Descarta backlog pendente no startup para evitar respostas repetidas apos restart.
Retorna o offset inicial seguro para o loop principal.
"""
payload: Dict[str, Any] = {
"timeout": 0,
"limit": 100,
"allowed_updates": ["message"],
}
async with session.post(f"{self.base_url}/getUpdates", json=payload) as response:
data = await response.json()
if not data.get("ok"):
logger.warning("Falha ao inicializar offset no Telegram: %s", data)
return None
updates = data.get("result", [])
if not updates:
return None
last_id = max(u.get("update_id", -1) for u in updates if isinstance(u.get("update_id"), int))
if last_id < 0:
return None
self._last_update_id = last_id
logger.info("Startup com backlog descartado: %s update(s) anteriores ignorados.", len(updates))
return last_id + 1
async def _get_updates(
self,
session: aiohttp.ClientSession,
offset: int | None,
) -> List[Dict[str, Any]]:
"""Busca novas mensagens no Telegram a partir do offset informado."""
payload: Dict[str, Any] = {
"timeout": self.polling_timeout,
"limit": 100,
"allowed_updates": ["message"],
}
if offset is not None:
payload["offset"] = offset
async with session.post(f"{self.base_url}/getUpdates", json=payload) as response:
data = await response.json()
if not data.get("ok"):
logger.warning("Falha em getUpdates: %s", data)
return []
return data.get("result", [])
async def _handle_update(
self,
session: aiohttp.ClientSession,
update: Dict[str, Any],
) -> None:
"""Processa uma atualizacao recebida e envia resposta ao chat."""
message = update.get("message", {})
text = message.get("text")
chat = message.get("chat", {})
chat_id = chat.get("id")
sender = message.get("from", {})
if not text or not chat_id:
return
try:
answer = await self._process_message(text=text, sender=sender, chat_id=chat_id)
except HTTPException as exc:
logger.warning("Falha de dominio ao processar mensagem no Telegram: %s", exc.detail)
answer = str(exc.detail) if exc.detail else "Nao foi possivel concluir a operacao solicitada."
except Exception:
logger.exception("Erro ao processar mensagem do Telegram.")
answer = "Nao consegui processar sua solicitacao agora. Tente novamente em instantes."
await self._send_message(session=session, chat_id=chat_id, text=answer)
async def _send_message(
self,
session: aiohttp.ClientSession,
chat_id: int,
text: str,
) -> None:
"""Envia mensagem de texto para o chat informado no Telegram."""
for chunk in _split_telegram_text(text):
payload = {
"chat_id": chat_id,
"text": chunk,
}
async with session.post(f"{self.base_url}/sendMessage", json=payload) as response:
data = await response.json()
if not data.get("ok"):
logger.warning("Falha em sendMessage: %s", data)
async def _process_message(self, text: str, sender: Dict[str, Any], chat_id: int) -> str:
"""Encaminha mensagem ao orquestrador com usuario identificado e retorna resposta."""
tools_db = SessionLocal()
mock_db = SessionMockLocal()
try:
user_service = UserService(mock_db)
external_id = str(sender.get("id") or chat_id)
first_name = (sender.get("first_name") or "").strip()
last_name = (sender.get("last_name") or "").strip()
display_name = f"{first_name} {last_name}".strip() or None
username = sender.get("username")
user = user_service.get_or_create(
channel="telegram",
external_id=external_id,
name=display_name,
username=username,
)
service = OrquestradorService(tools_db)
return await service.handle_message(message=text, user_id=user.id)
finally:
tools_db.close()
mock_db.close()
async def main() -> None:
"""Inicializa servico satelite do Telegram e inicia processamento continuo."""
token = settings.telegram_bot_token
if not token:
raise RuntimeError("TELEGRAM_BOT_TOKEN nao configurado.")
_ensure_supported_runtime_configuration()
lock_handle = _acquire_single_instance_lock("orquestrador_telegram_satellite.lock")
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s %(message)s")
service = TelegramSatelliteService(token=token)
try:
await service.run()
finally:
lock_handle.close()
if __name__ == "__main__":
asyncio.run(main())