From 31916bacc33e01de70ac4f5d06ef0a17d16b9ec8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vitor=20Hugo=20Belorio=20Sim=C3=A3o?= Date: Mon, 2 Mar 2026 10:13:23 -0300 Subject: [PATCH] =?UTF-8?q?=F0=9F=90=9B=20fix(telegram):=20tratar=20erros?= =?UTF-8?q?=20de=20dominio=20e=20melhorar=20respostas=20operacionais?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../telegram_satellite_service.py | 124 ++++++++++++++++++ app/services/orquestrador_service.py | 117 +++++++++++++++-- 2 files changed, 231 insertions(+), 10 deletions(-) create mode 100644 app/integrations/telegram_satellite_service.py diff --git a/app/integrations/telegram_satellite_service.py b/app/integrations/telegram_satellite_service.py new file mode 100644 index 0000000..1f90411 --- /dev/null +++ b/app/integrations/telegram_satellite_service.py @@ -0,0 +1,124 @@ +import asyncio +import logging +from typing import Any, Dict, List + +import aiohttp +from fastapi import HTTPException + +from app.core.settings import settings +from app.db.database import SessionLocal +from app.services.orquestrador_service import OrquestradorService + + +logger = logging.getLogger(__name__) + + +class TelegramSatelliteService: + """ + Interface satelite para Telegram. + Processa mensagens direto no OrquestradorService e publica respostas no chat. + """ + + def __init__(self, token: str): + """Configura cliente Telegram com URL base e timeouts padrao.""" + self.base_url = f"https://api.telegram.org/bot{token}" + self.polling_timeout = settings.telegram_polling_timeout + self.request_timeout = settings.telegram_request_timeout + + async def run(self) -> None: + """Inicia loop de long polling para consumir atualizacoes do bot.""" + logger.info("Telegram satellite iniciado com long polling.") + offset = None + timeout = aiohttp.ClientTimeout(total=self.request_timeout) + + async with aiohttp.ClientSession(timeout=timeout) as session: + while True: + updates = await self._get_updates(session=session, offset=offset) + for update in updates: + offset = update["update_id"] + 1 + await self._handle_update(session=session, update=update) + + async def _get_updates( + self, + session: aiohttp.ClientSession, + offset: int | None, + ) -> List[Dict[str, Any]]: + """Busca novas mensagens no Telegram a partir do offset informado.""" + payload: Dict[str, Any] = { + "timeout": self.polling_timeout, + "allowed_updates": ["message"], + } + if offset is not None: + payload["offset"] = offset + + async with session.post(f"{self.base_url}/getUpdates", json=payload) as response: + data = await response.json() + if not data.get("ok"): + logger.warning("Falha em getUpdates: %s", data) + return [] + return data.get("result", []) + + async def _handle_update( + self, + session: aiohttp.ClientSession, + update: Dict[str, Any], + ) -> None: + """Processa uma atualizacao recebida e envia resposta ao chat.""" + message = update.get("message", {}) + text = message.get("text") + chat = message.get("chat", {}) + chat_id = chat.get("id") + + if not text or not chat_id: + return + + try: + answer = await self._process_message(text=text) + except HTTPException as exc: + logger.warning("Falha de dominio ao processar mensagem no Telegram: %s", exc.detail) + answer = str(exc.detail) if exc.detail else "Nao foi possivel concluir a operacao solicitada." + except Exception: + logger.exception("Erro ao processar mensagem do Telegram.") + answer = "Nao consegui processar sua solicitacao agora. Tente novamente em instantes." + + await self._send_message(session=session, chat_id=chat_id, text=answer) + + async def _send_message( + self, + session: aiohttp.ClientSession, + chat_id: int, + text: str, + ) -> None: + """Envia mensagem de texto para o chat informado no Telegram.""" + payload = { + "chat_id": chat_id, + "text": text, + } + async with session.post(f"{self.base_url}/sendMessage", json=payload) as response: + data = await response.json() + if not data.get("ok"): + logger.warning("Falha em sendMessage: %s", data) + + async def _process_message(self, text: str) -> str: + """Encaminha mensagem ao orquestrador e retorna a resposta gerada.""" + db = SessionLocal() + try: + service = OrquestradorService(db) + return await service.handle_message(message=text) + finally: + db.close() + + +async def main() -> None: + """Inicializa servico satelite do Telegram e inicia processamento continuo.""" + token = settings.telegram_bot_token + if not token: + raise RuntimeError("TELEGRAM_BOT_TOKEN nao configurado.") + + logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s %(message)s") + service = TelegramSatelliteService(token=token) + await service.run() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/app/services/orquestrador_service.py b/app/services/orquestrador_service.py index 72eee95..9699428 100644 --- a/app/services/orquestrador_service.py +++ b/app/services/orquestrador_service.py @@ -1,3 +1,4 @@ +from fastapi import HTTPException from sqlalchemy.orm import Session from app.services.llm_service import LLMService @@ -5,6 +6,16 @@ from app.services.tool_registry import ToolRegistry class OrquestradorService: + LOW_VALUE_RESPONSES = { + "certo.", + "certo", + "ok.", + "ok", + "entendi.", + "entendi", + "claro.", + "claro", + } def __init__(self, db: Session): """Inicializa servicos de LLM e registro de tools para a sessao atual.""" @@ -13,28 +24,114 @@ class OrquestradorService: async def handle_message(self, message: str) -> str: """Processa mensagem, executa tool quando necessario e retorna resposta final.""" - tools = self.registry.get_tools() llm_result = await self.llm.generate_response( - message=message, + message=self._build_router_prompt(message), tools=tools, ) + if not llm_result["tool_call"] and self._is_operational_query(message): + llm_result = await self.llm.generate_response( + message=self._build_force_tool_prompt(message), + tools=tools, + ) + if llm_result["tool_call"]: tool_name = llm_result["tool_call"]["name"] arguments = llm_result["tool_call"]["arguments"] - tool_result = await self.registry.execute(tool_name, arguments) + try: + tool_result = await self.registry.execute(tool_name, arguments) + except HTTPException as exc: + return self._http_exception_detail(exc) - # Segunda rodada para formatar resposta final_response = await self.llm.generate_response( - message=f"Resultado da funcao {tool_name}: {tool_result}", - tools=tools, + message=self._build_result_prompt( + user_message=message, + tool_name=tool_name, + tool_result=tool_result, + ), + tools=[], ) + text = (final_response.get("response") or "").strip() + if self._is_low_value_response(text): + return self._fallback_format_tool_result(tool_name, tool_result) + + return text or self._fallback_format_tool_result(tool_name, tool_result) + + text = (llm_result.get("response") or "").strip() + if self._is_low_value_response(text): + return "Entendi. Pode me dar mais detalhes para eu consultar corretamente?" + return text + + def _is_low_value_response(self, text: str) -> bool: + return text.strip().lower() in self.LOW_VALUE_RESPONSES + + def _is_operational_query(self, message: str) -> bool: + text = message.lower() + keywords = ( + "estoque", + "carro", + "carros", + "suv", + "sedan", + "hatch", + "pickup", + "financi", + "cpf", + "troca", + "revis", + "placa", + "cancelar pedido", + "pedido", + ) + return any(k in text for k in keywords) + + def _build_router_prompt(self, user_message: str) -> str: + return ( + "Voce e um assistente de concessionaria. " + "Sempre que a solicitacao depender de dados operacionais (estoque, validacao de cliente, " + "avaliacao de troca, agendamento de revisao ou cancelamento de pedido), use a tool correta. " + "Se faltar parametro obrigatorio para a tool, responda em texto pedindo apenas o que falta.\n\n" + f"Mensagem do usuario: {user_message}" + ) + + def _build_force_tool_prompt(self, user_message: str) -> str: + return ( + "Reavalie a mensagem e priorize chamar tool se houver intencao operacional. " + "Use texto apenas quando faltar dado obrigatorio.\n\n" + f"Mensagem do usuario: {user_message}" + ) + + def _build_result_prompt(self, user_message: str, tool_name: str, tool_result) -> str: + return ( + "Responda ao usuario de forma objetiva usando o resultado da tool abaixo. " + "Nao invente dados. Se a lista vier vazia, diga explicitamente que nao encontrou resultados.\n\n" + f"Pergunta original: {user_message}\n" + f"Tool executada: {tool_name}\n" + f"Resultado da tool: {tool_result}" + ) + + def _http_exception_detail(self, exc: HTTPException) -> str: + detail = exc.detail + if isinstance(detail, str): + return detail + return "Nao foi possivel concluir a operacao solicitada." + + def _fallback_format_tool_result(self, tool_name: str, tool_result) -> str: + if tool_name == "consultar_estoque": + if not tool_result: + return "Nao encontrei nenhum veiculo com os criterios informados." + return f"Encontrei {len(tool_result)} veiculo(s) com os criterios informados." + + if tool_name == "cancelar_pedido" and isinstance(tool_result, dict): + numero = tool_result.get("numero_pedido", "N/A") + status = tool_result.get("status", "N/A") + return f"Pedido {numero} atualizado com status {status}." - return final_response["response"] + if tool_name == "validar_cliente_venda" and isinstance(tool_result, dict): + aprovado = tool_result.get("aprovado") + return "Cliente aprovado para financiamento." if aprovado else "Cliente nao aprovado para financiamento." - # Se o modelo nao chamou nenhuma tool, - # significa que ele respondeu diretamente em texto. - return llm_result["response"] + return "Operacao concluida com sucesso."