You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
125 lines
4.3 KiB
Python
125 lines
4.3 KiB
Python
import asyncio
|
|
import logging
|
|
from typing import Any, Dict, List
|
|
|
|
import aiohttp
|
|
from fastapi import HTTPException
|
|
|
|
from app.core.settings import settings
|
|
from app.db.database import SessionLocal
|
|
from app.services.orquestrador_service import OrquestradorService
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class TelegramSatelliteService:
|
|
"""
|
|
Interface satelite para Telegram.
|
|
Processa mensagens direto no OrquestradorService e publica respostas no chat.
|
|
"""
|
|
|
|
def __init__(self, token: str):
|
|
"""Configura cliente Telegram com URL base e timeouts padrao."""
|
|
self.base_url = f"https://api.telegram.org/bot{token}"
|
|
self.polling_timeout = settings.telegram_polling_timeout
|
|
self.request_timeout = settings.telegram_request_timeout
|
|
|
|
async def run(self) -> None:
|
|
"""Inicia loop de long polling para consumir atualizacoes do bot."""
|
|
logger.info("Telegram satellite iniciado com long polling.")
|
|
offset = None
|
|
timeout = aiohttp.ClientTimeout(total=self.request_timeout)
|
|
|
|
async with aiohttp.ClientSession(timeout=timeout) as session:
|
|
while True:
|
|
updates = await self._get_updates(session=session, offset=offset)
|
|
for update in updates:
|
|
offset = update["update_id"] + 1
|
|
await self._handle_update(session=session, update=update)
|
|
|
|
async def _get_updates(
|
|
self,
|
|
session: aiohttp.ClientSession,
|
|
offset: int | None,
|
|
) -> List[Dict[str, Any]]:
|
|
"""Busca novas mensagens no Telegram a partir do offset informado."""
|
|
payload: Dict[str, Any] = {
|
|
"timeout": self.polling_timeout,
|
|
"allowed_updates": ["message"],
|
|
}
|
|
if offset is not None:
|
|
payload["offset"] = offset
|
|
|
|
async with session.post(f"{self.base_url}/getUpdates", json=payload) as response:
|
|
data = await response.json()
|
|
if not data.get("ok"):
|
|
logger.warning("Falha em getUpdates: %s", data)
|
|
return []
|
|
return data.get("result", [])
|
|
|
|
async def _handle_update(
|
|
self,
|
|
session: aiohttp.ClientSession,
|
|
update: Dict[str, Any],
|
|
) -> None:
|
|
"""Processa uma atualizacao recebida e envia resposta ao chat."""
|
|
message = update.get("message", {})
|
|
text = message.get("text")
|
|
chat = message.get("chat", {})
|
|
chat_id = chat.get("id")
|
|
|
|
if not text or not chat_id:
|
|
return
|
|
|
|
try:
|
|
answer = await self._process_message(text=text)
|
|
except HTTPException as exc:
|
|
logger.warning("Falha de dominio ao processar mensagem no Telegram: %s", exc.detail)
|
|
answer = str(exc.detail) if exc.detail else "Nao foi possivel concluir a operacao solicitada."
|
|
except Exception:
|
|
logger.exception("Erro ao processar mensagem do Telegram.")
|
|
answer = "Nao consegui processar sua solicitacao agora. Tente novamente em instantes."
|
|
|
|
await self._send_message(session=session, chat_id=chat_id, text=answer)
|
|
|
|
async def _send_message(
|
|
self,
|
|
session: aiohttp.ClientSession,
|
|
chat_id: int,
|
|
text: str,
|
|
) -> None:
|
|
"""Envia mensagem de texto para o chat informado no Telegram."""
|
|
payload = {
|
|
"chat_id": chat_id,
|
|
"text": text,
|
|
}
|
|
async with session.post(f"{self.base_url}/sendMessage", json=payload) as response:
|
|
data = await response.json()
|
|
if not data.get("ok"):
|
|
logger.warning("Falha em sendMessage: %s", data)
|
|
|
|
async def _process_message(self, text: str) -> str:
|
|
"""Encaminha mensagem ao orquestrador e retorna a resposta gerada."""
|
|
db = SessionLocal()
|
|
try:
|
|
service = OrquestradorService(db)
|
|
return await service.handle_message(message=text)
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
async def main() -> None:
|
|
"""Inicializa servico satelite do Telegram e inicia processamento continuo."""
|
|
token = settings.telegram_bot_token
|
|
if not token:
|
|
raise RuntimeError("TELEGRAM_BOT_TOKEN nao configurado.")
|
|
|
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s %(message)s")
|
|
service = TelegramSatelliteService(token=token)
|
|
await service.run()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|