You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
orquestrador/app/integrations/telegram_satellite_service.py

431 lines
16 KiB
Python

import asyncio
import logging
import os
import tempfile
from typing import Any, Dict, List
import aiohttp
from fastapi import HTTPException
from app.core.settings import settings
from app.db.database import SessionLocal
from app.db.mock_database import SessionMockLocal
from app.services.ai.llm_service import (
IMAGE_ANALYSIS_BLOCKING_PREFIXES,
LLMService,
)
from app.services.orchestration.orquestrador_service import OrquestradorService
from app.services.user.user_service import UserService
logger = logging.getLogger(__name__)
TELEGRAM_MESSAGE_SAFE_LIMIT = 3800
def _split_telegram_text(text: str, limit: int = TELEGRAM_MESSAGE_SAFE_LIMIT) -> List[str]:
normalized = str(text or "").strip()
if not normalized:
return [""]
if len(normalized) <= limit:
return [normalized]
chunks: List[str] = []
current = ""
def flush_current() -> None:
nonlocal current
if current:
chunks.append(current)
current = ""
paragraphs = normalized.split("\n\n")
for paragraph in paragraphs:
candidate = paragraph if not current else f"{current}\n\n{paragraph}"
if len(candidate) <= limit:
current = candidate
continue
flush_current()
if len(paragraph) <= limit:
current = paragraph
continue
line_buffer = ""
for line in paragraph.split("\n"):
line_candidate = line if not line_buffer else f"{line_buffer}\n{line}"
if len(line_candidate) <= limit:
line_buffer = line_candidate
continue
if line_buffer:
chunks.append(line_buffer)
line_buffer = ""
while len(line) > limit:
chunks.append(line[:limit])
line = line[limit:]
line_buffer = line
if line_buffer:
current = line_buffer
flush_current()
return chunks or [normalized[:limit]]
def _ensure_supported_runtime_configuration() -> None:
"""
Em producao, o satelite nao deve operar com estado conversacional apenas em memoria,
porque isso quebra continuidade entre reinicios e instancias.
"""
if settings.environment == "production" and settings.conversation_state_backend == "memory":
raise RuntimeError(
"Telegram satellite em producao exige conversation_state_backend diferente de memory."
)
def _acquire_single_instance_lock(lock_name: str):
"""
Garante apenas uma instancia local do satelite por lock de arquivo.
Retorna o handle do arquivo para manter o lock vivo.
"""
lock_path = os.path.join(tempfile.gettempdir(), lock_name)
handle = open(lock_path, "a+")
try:
import msvcrt
try:
msvcrt.locking(handle.fileno(), msvcrt.LK_NBLCK, 1)
except OSError:
handle.close()
raise RuntimeError(
"Ja existe uma instancia do Telegram satellite em execucao neste host."
)
return handle
except ImportError:
import fcntl
try:
fcntl.flock(handle.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
except OSError:
handle.close()
raise RuntimeError(
"Ja existe uma instancia do Telegram satellite em execucao neste host."
)
return handle
class TelegramSatelliteService:
"""
Interface satelite para Telegram.
Processa mensagens direto no OrquestradorService e publica respostas no chat.
"""
def __init__(self, token: str):
"""Configura cliente Telegram com URL base e timeouts padrao."""
self.base_url = f"https://api.telegram.org/bot{token}"
self.file_base_url = f"https://api.telegram.org/file/bot{token}"
self.polling_timeout = settings.telegram_polling_timeout
self.request_timeout = settings.telegram_request_timeout
self._last_update_id = -1
async def run(self) -> None:
"""Inicia loop de long polling para consumir atualizacoes do bot."""
logger.info("Telegram satellite iniciado com long polling.")
await self._warmup_llm()
offset = None
timeout = aiohttp.ClientTimeout(total=self.request_timeout)
async with aiohttp.ClientSession(timeout=timeout) as session:
offset = await self._initialize_offset(session=session)
while True:
updates = await self._get_updates(session=session, offset=offset)
for update in updates:
update_id = update.get("update_id")
if not isinstance(update_id, int):
continue
if update_id <= self._last_update_id:
continue
self._last_update_id = update_id
offset = update_id + 1
await self._handle_update(session=session, update=update)
async def _warmup_llm(self) -> None:
"""Preaquece o LLM no startup do satelite para reduzir latencia do primeiro usuario."""
try:
await LLMService().warmup()
logger.info("Warmup de LLM concluido no Telegram satellite.")
except Exception:
logger.exception("Falha no warmup de LLM do Telegram satellite.")
async def _initialize_offset(self, session: aiohttp.ClientSession) -> int | None:
"""
Descarta backlog pendente no startup para evitar respostas repetidas apos restart.
Retorna o offset inicial seguro para o loop principal.
"""
payload: Dict[str, Any] = {
"timeout": 0,
"limit": 100,
"allowed_updates": ["message"],
}
async with session.post(f"{self.base_url}/getUpdates", json=payload) as response:
data = await response.json()
if not data.get("ok"):
logger.warning("Falha ao inicializar offset no Telegram: %s", data)
return None
updates = data.get("result", [])
if not updates:
return None
last_id = max(u.get("update_id", -1) for u in updates if isinstance(u.get("update_id"), int))
if last_id < 0:
return None
self._last_update_id = last_id
logger.info("Startup com backlog descartado: %s update(s) anteriores ignorados.", len(updates))
return last_id + 1
async def _get_updates(
self,
session: aiohttp.ClientSession,
offset: int | None,
) -> List[Dict[str, Any]]:
"""Busca novas mensagens no Telegram a partir do offset informado."""
payload: Dict[str, Any] = {
"timeout": self.polling_timeout,
"limit": 100,
"allowed_updates": ["message"],
}
if offset is not None:
payload["offset"] = offset
async with session.post(f"{self.base_url}/getUpdates", json=payload) as response:
data = await response.json()
if not data.get("ok"):
logger.warning("Falha em getUpdates: %s", data)
return []
return data.get("result", [])
async def _handle_update(
self,
session: aiohttp.ClientSession,
update: Dict[str, Any],
) -> None:
"""Processa uma atualizacao recebida e envia resposta ao chat."""
message = update.get("message", {})
text = message.get("text") or message.get("caption")
chat = message.get("chat", {})
chat_id = chat.get("id")
sender = message.get("from", {})
image_attachments = await self._extract_image_attachments(session=session, message=message)
if (not text and not image_attachments) or not chat_id:
return
try:
answer = await self._process_message(
text=text or "",
sender=sender,
chat_id=chat_id,
image_attachments=image_attachments,
)
except HTTPException as exc:
logger.warning("Falha de dominio ao processar mensagem no Telegram: %s", exc.detail)
answer = str(exc.detail) if exc.detail else "Nao foi possivel concluir a operacao solicitada."
except Exception:
logger.exception("Erro ao processar mensagem do Telegram.")
answer = "Nao consegui processar sua solicitacao agora. Tente novamente em instantes."
await self._send_message(session=session, chat_id=chat_id, text=answer)
async def _send_message(
self,
session: aiohttp.ClientSession,
chat_id: int,
text: str,
) -> None:
"""Envia mensagem de texto para o chat informado no Telegram."""
for chunk in _split_telegram_text(text):
payload = {
"chat_id": chat_id,
"text": chunk,
}
async with session.post(f"{self.base_url}/sendMessage", json=payload) as response:
data = await response.json()
if not data.get("ok"):
logger.warning("Falha em sendMessage: %s", data)
# Processa uma mensagem do Telegram e injeta o texto extraido de imagens quando houver.
async def _process_message(
self,
text: str,
sender: Dict[str, Any],
chat_id: int,
image_attachments: List[Dict[str, Any]] | None = None,
) -> str:
"""Encaminha mensagem ao orquestrador com usuario identificado e retorna resposta."""
tools_db = SessionLocal()
mock_db = SessionMockLocal()
try:
user_service = UserService(mock_db)
external_id = str(sender.get("id") or chat_id)
first_name = (sender.get("first_name") or "").strip()
last_name = (sender.get("last_name") or "").strip()
display_name = f"{first_name} {last_name}".strip() or None
username = sender.get("username")
user = user_service.get_or_create(
channel="telegram",
external_id=external_id,
name=display_name,
username=username,
)
message_text = text
if image_attachments:
image_message = await self._build_orchestration_message_from_image(
caption=text,
image_attachments=image_attachments,
)
if self._is_image_analysis_failure_message(image_message):
return image_message
message_text = image_message
service = OrquestradorService(tools_db)
return await service.handle_message(message=message_text, user_id=user.id)
finally:
tools_db.close()
mock_db.close()
# Filtra documentos do Telegram para aceitar apenas imagens.
def _is_supported_image_document(self, document: Dict[str, Any]) -> bool:
mime_type = str((document or {}).get("mime_type") or "").strip().lower()
return mime_type.startswith("image/")
# Reconhece a resposta padrao quando a leitura da imagem falha.
def _is_image_analysis_failure_message(self, text: str) -> bool:
normalized = str(text or "").strip().lower()
return any(normalized.startswith(prefix) for prefix in IMAGE_ANALYSIS_BLOCKING_PREFIXES)
# Extrai a melhor foto e documentos de imagem anexados na mensagem.
async def _extract_image_attachments(
self,
session: aiohttp.ClientSession,
message: Dict[str, Any],
) -> List[Dict[str, Any]]:
attachments: List[Dict[str, Any]] = []
photos = message.get("photo") or []
if isinstance(photos, list) and photos:
best_photo = max(
(item for item in photos if isinstance(item, dict) and item.get("file_id")),
key=lambda item: int(item.get("file_size") or 0),
default=None,
)
if best_photo is not None:
attachment = await self._download_image_attachment(
session=session,
file_id=str(best_photo.get("file_id")),
mime_type="image/jpeg",
file_name="telegram_photo.jpg",
)
if attachment:
attachments.append(attachment)
document = message.get("document") or {}
if isinstance(document, dict) and document.get("file_id") and self._is_supported_image_document(document):
attachment = await self._download_image_attachment(
session=session,
file_id=str(document.get("file_id")),
mime_type=str(document.get("mime_type") or "image/jpeg"),
file_name=str(document.get("file_name") or "telegram_image"),
)
if attachment:
attachments.append(attachment)
return attachments
# Baixa um anexo de imagem do Telegram e devolve seu payload bruto.
async def _download_image_attachment(
self,
session: aiohttp.ClientSession,
file_id: str,
mime_type: str,
file_name: str,
) -> Dict[str, Any] | None:
async with session.post(f"{self.base_url}/getFile", json={"file_id": file_id}) as response:
data = await response.json()
if not data.get("ok"):
logger.warning("Falha em getFile para imagem do Telegram: %s", data)
return None
file_path = str((data.get("result") or {}).get("file_path") or "").strip()
if not file_path:
return None
async with session.get(f"{self.file_base_url}/{file_path}") as file_response:
if file_response.status >= 400:
logger.warning("Falha ao baixar arquivo do Telegram: status=%s path=%s", file_response.status, file_path)
return None
payload = await file_response.read()
if not payload:
return None
return {
"mime_type": mime_type,
"file_name": file_name,
"data": payload,
}
# Combina legenda e texto extraido da imagem em uma mensagem unica para o fluxo.
async def _build_orchestration_message_from_image(
self,
*,
caption: str,
image_attachments: List[Dict[str, Any]],
) -> str:
extracted_message = await LLMService().extract_image_workflow_message(
caption=caption,
attachments=image_attachments,
)
caption_text = str(caption or "").strip()
extracted_text = str(extracted_message or "").strip()
if self._is_image_analysis_failure_message(extracted_text):
return extracted_text
if caption_text and extracted_text:
return (
"[imagem recebida no telegram]\n"
f"Legenda do usuario: {caption_text}\n"
f"Dados extraidos da imagem: {extracted_text}"
)
if extracted_text:
return f"[imagem recebida no telegram]\nDados extraidos da imagem: {extracted_text}"
if caption_text:
return caption_text
return "Recebi uma imagem, mas preciso que voce descreva o documento ou envie uma foto mais nitida."
async def main() -> None:
"""Inicializa servico satelite do Telegram e inicia processamento continuo."""
token = settings.telegram_bot_token
if not token:
raise RuntimeError("TELEGRAM_BOT_TOKEN nao configurado.")
_ensure_supported_runtime_configuration()
lock_handle = _acquire_single_instance_lock("orquestrador_telegram_satellite.lock")
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s %(message)s")
service = TelegramSatelliteService(token=token)
try:
await service.run()
finally:
lock_handle.close()
if __name__ == "__main__":
asyncio.run(main())