You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
orquestrador/app/services/orquestrador_service.py

449 lines
18 KiB
Python

import re
from datetime import datetime, timedelta
from fastapi import HTTPException
from sqlalchemy.orm import Session
from app.services.llm_service import LLMService
from app.services.tool_registry import ToolRegistry
class OrquestradorService:
# Memoria temporaria de confirmacao quando a API sugere novo horario (conflito 409).
PENDING_REVIEW_CONFIRMATIONS: dict[int, dict] = {}
PENDING_REVIEW_TTL_MINUTES = 30 # Pode ser alterado por uma variável de configuração caso o sistema cresça
# Rascunho por usuario para juntar dados de revisao enviados em mensagens separadas.
PENDING_REVIEW_DRAFTS: dict[int, dict] = {}
PENDING_REVIEW_DRAFT_TTL_MINUTES = 30
REVIEW_REQUIRED_FIELDS = (
"placa",
"data_hora",
"modelo",
"ano",
"km",
"revisao_previa_concessionaria",
)
LOW_VALUE_RESPONSES = {
"certo.",
"certo",
"ok.",
"ok",
"entendi.",
"entendi",
"claro.",
"claro",
}
def __init__(self, db: Session):
"""Inicializa servicos de LLM e registro de tools para a sessao atual."""
self.llm = LLMService()
self.registry = ToolRegistry(db)
async def handle_message(self, message: str, user_id: int | None = None) -> str:
"""Processa mensagem, executa tool quando necessario e retorna resposta final."""
# 1) Se houver sugestao pendente de horario e o usuario confirmou ("pode/sim"),
# agenda direto no horario sugerido.
confirmation_response = await self._try_confirm_pending_review(message=message, user_id=user_id)
if confirmation_response:
return confirmation_response
# 2) Fluxo de coleta incremental de dados da revisao (slot filling).
# Evita pedir tudo de novo quando o usuario responde em partes.
review_response = await self._try_collect_and_schedule_review(message=message, user_id=user_id)
if review_response:
return review_response
tools = self.registry.get_tools()
llm_result = await self.llm.generate_response(
message=self._build_router_prompt(user_message=message, user_id=user_id),
tools=tools,
)
if not llm_result["tool_call"] and self._is_operational_query(message):
llm_result = await self.llm.generate_response(
message=self._build_force_tool_prompt(user_message=message, user_id=user_id),
tools=tools,
)
if llm_result["tool_call"]:
tool_name = llm_result["tool_call"]["name"]
arguments = llm_result["tool_call"]["arguments"]
try:
tool_result = await self.registry.execute(
tool_name,
arguments,
user_id=user_id,
)
except HTTPException as exc:
self._capture_review_confirmation_suggestion(
tool_name=tool_name,
arguments=arguments,
exc=exc,
user_id=user_id,
)
return self._http_exception_detail(exc)
final_response = await self.llm.generate_response(
message=self._build_result_prompt(
user_message=message,
user_id=user_id,
tool_name=tool_name,
tool_result=tool_result,
),
tools=[],
)
text = (final_response.get("response") or "").strip()
if self._is_low_value_response(text):
return self._fallback_format_tool_result(tool_name, tool_result)
return text or self._fallback_format_tool_result(tool_name, tool_result)
text = (llm_result.get("response") or "").strip()
if self._is_low_value_response(text):
return "Entendi. Pode me dar mais detalhes para eu consultar corretamente?"
return text
def _is_low_value_response(self, text: str) -> bool:
return text.strip().lower() in self.LOW_VALUE_RESPONSES
def _is_review_intent(self, text: str) -> bool:
lowered = (text or "").lower()
return any(k in lowered for k in ("revis", "manutenc", "agendar", "horario"))
def _extract_review_fields(self, text: str) -> dict:
# Extrai os campos de revisao com regex simples para reduzir dependencia do LLM
# em mensagens curtas de follow-up.
lowered = (text or "").lower()
extracted: dict = {}
placa_match = re.search(r"\b([A-Za-z]{3}[0-9][A-Za-z0-9][0-9]{2}|[A-Za-z]{3}[0-9]{4})\b", text or "")
if placa_match:
extracted["placa"] = placa_match.group(1).upper()
dt_match = re.search(
r"(\d{1,2}[/-]\d{1,2}[/-]\d{4}\s*(?:as|às)?\s*\d{1,2}:\d{2})|"
r"(\d{4}[/-]\d{1,2}[/-]\d{1,2}\s*(?:as|às)?\s*\d{1,2}:\d{2})|"
r"(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}(?::\d{2})?(?:Z|[+-]\d{2}:\d{2})?)",
lowered,
)
if dt_match:
value = next((g for g in dt_match.groups() if g), None)
if value:
extracted["data_hora"] = re.sub(r"\s+às\s+", " as ", value, flags=re.IGNORECASE)
modelo_match = re.search(r"modelo\s+([a-z0-9][a-z0-9\s\-]{1,40})", lowered)
if modelo_match:
modelo = modelo_match.group(1).strip(" ,.;")
if modelo:
extracted["modelo"] = modelo.title()
ano_match = re.search(r"(?:ano\s*)?(19\d{2}|20\d{2})\b", lowered)
if ano_match:
extracted["ano"] = int(ano_match.group(1))
km_match = re.search(r"(\d{1,3}(?:[.\s]\d{3})*|\d+)\s*km\b", lowered)
if km_match:
km_text = re.sub(r"[.\s]", "", km_match.group(1))
if km_text.isdigit():
extracted["km"] = int(km_text)
if any(k in lowered for k in ("ja fiz revisao", "já fiz revisão", "ja fez revisao", "já fez revisão")):
extracted["revisao_previa_concessionaria"] = True
elif any(
k in lowered
for k in (
"nao fiz revisao",
"não fiz revisão",
"primeira revisao",
"primeira revisão",
"nunca fiz revisao",
"nunca fiz revisão",
)
):
extracted["revisao_previa_concessionaria"] = False
return extracted
def _render_missing_review_fields_prompt(self, missing_fields: list[str]) -> str:
labels = {
"placa": "a placa do veiculo",
"data_hora": "a data e hora desejada para a revisao",
"modelo": "o modelo do veiculo",
"ano": "o ano do veiculo",
"km": "a quilometragem atual (km)",
"revisao_previa_concessionaria": "se ja fez revisao na concessionaria (sim/nao)",
}
itens = [f"- {labels[field]}" for field in missing_fields]
return "Para agendar sua revisao, preciso dos dados abaixo:\n" + "\n".join(itens)
# Em vez de tentar entender tudo de uma vez, o bot mantém um "estado" do que já sabe e vai perguntando apenas o que falta (os "slots" vazios) até que a tarefa possa ser completada.
async def _try_collect_and_schedule_review(self, message: str, user_id: int | None) -> str | None:
if user_id is None:
return None
# Reaproveita rascunho anterior do usuario, se ainda estiver valido.
draft = self.PENDING_REVIEW_DRAFTS.get(user_id)
if draft and draft["expires_at"] < datetime.utcnow():
self.PENDING_REVIEW_DRAFTS.pop(user_id, None)
draft = None
extracted = self._extract_review_fields(message)
has_intent = self._is_review_intent(message)
# Sem intencao de revisao e sem rascunho aberto: nao interfere no fluxo normal.
if not has_intent and draft is None:
return None
if draft is None:
draft = {"payload": {}, "expires_at": datetime.utcnow() + timedelta(minutes=self.PENDING_REVIEW_DRAFT_TTL_MINUTES)}
# Permite o usuario "abortar" a coleta atual.
if "cancelar" in (message or "").lower() and draft["payload"]:
self.PENDING_REVIEW_DRAFTS.pop(user_id, None)
return None
# Merge incremental: apenas atualiza os campos detectados na mensagem atual.
draft["payload"].update(extracted)
draft["expires_at"] = datetime.utcnow() + timedelta(minutes=self.PENDING_REVIEW_DRAFT_TTL_MINUTES)
self.PENDING_REVIEW_DRAFTS[user_id] = draft
# Enquanto faltar campo obrigatorio, responde de forma deterministica
# (sem depender do LLM para lembrar contexto).
missing = [field for field in self.REVIEW_REQUIRED_FIELDS if field not in draft["payload"]]
if missing:
return self._render_missing_review_fields_prompt(missing)
try:
# Com payload completo, executa a tool de agendamento.
tool_result = await self.registry.execute(
"agendar_revisao",
draft["payload"],
user_id=user_id,
)
except HTTPException as exc:
# Se houver conflito com sugestao de horario, salva para confirmar com "pode/sim".
self._capture_review_confirmation_suggestion(
tool_name="agendar_revisao",
arguments=draft["payload"],
exc=exc,
user_id=user_id,
)
return self._http_exception_detail(exc)
finally:
# Limpa o rascunho apos tentativa final para evitar estado sujo.
self.PENDING_REVIEW_DRAFTS.pop(user_id, None)
return self._fallback_format_tool_result("agendar_revisao", tool_result)
def _is_affirmative_message(self, text: str) -> bool:
normalized = (text or "").strip().lower()
normalized = re.sub(r"[.!?,;:]+$", "", normalized)
return normalized in {"sim", "pode", "ok", "confirmo", "aceito", "fechado", "pode sim"}
def _is_negative_message(self, text: str) -> bool:
normalized = (text or "").strip().lower()
normalized = re.sub(r"[.!?,;:]+$", "", normalized)
return (
normalized in {"nao", "não", "nao quero", "não quero", "prefiro outro", "outro horario", "outro horário"}
or normalized.startswith("nao")
or normalized.startswith("não")
)
def _extract_time_only(self, text: str) -> str | None:
match = re.search(r"\b([01]?\d|2[0-3]):([0-5]\d)\b", text or "")
if not match:
return None
return f"{int(match.group(1)):02d}:{match.group(2)}"
def _merge_date_with_time(self, base_iso: str, new_time_hhmm: str) -> str | None:
try:
base_dt = datetime.fromisoformat((base_iso or "").replace("Z", "+00:00"))
except ValueError:
return None
try:
hour_text, minute_text = new_time_hhmm.split(":")
merged = base_dt.replace(hour=int(hour_text), minute=int(minute_text), second=0, microsecond=0)
return merged.isoformat()
except Exception:
return None
def _capture_review_confirmation_suggestion(
self,
tool_name: str,
arguments: dict,
exc: HTTPException,
user_id: int | None,
) -> None:
if tool_name != "agendar_revisao" or user_id is None or exc.status_code != 409:
return
detail = exc.detail if isinstance(exc.detail, str) else ""
match = re.search(r"ISO:\s*([^)]+)\)", detail)
if not match:
return
suggested_iso = match.group(1).strip()
payload = dict(arguments or {})
if not payload.get("placa"):
return
payload["data_hora"] = suggested_iso
self.PENDING_REVIEW_CONFIRMATIONS[user_id] = {
"payload": payload,
"expires_at": datetime.utcnow() + timedelta(minutes=self.PENDING_REVIEW_TTL_MINUTES),
}
async def _try_confirm_pending_review(self, message: str, user_id: int | None) -> str | None:
if user_id is None:
return None
pending = self.PENDING_REVIEW_CONFIRMATIONS.get(user_id)
if not pending:
return None
time_only = self._extract_time_only(message)
if self._is_negative_message(message) or time_only:
# Se o usuario recusar a sugestao e informar novo horario, reaproveita
# o payload pendente com a nova data/hora.
extracted = self._extract_review_fields(message)
new_data_hora = extracted.get("data_hora")
if not new_data_hora and time_only:
new_data_hora = self._merge_date_with_time(pending["payload"].get("data_hora", ""), time_only)
if not new_data_hora:
self.PENDING_REVIEW_CONFIRMATIONS.pop(user_id, None)
return "Sem problema. Me informe a nova data e hora desejada para a revisao."
payload = dict(pending["payload"])
payload["data_hora"] = new_data_hora
try:
tool_result = await self.registry.execute(
"agendar_revisao",
payload,
user_id=user_id,
)
except HTTPException as exc:
self.PENDING_REVIEW_CONFIRMATIONS.pop(user_id, None)
self._capture_review_confirmation_suggestion(
tool_name="agendar_revisao",
arguments=payload,
exc=exc,
user_id=user_id,
)
return self._http_exception_detail(exc)
self.PENDING_REVIEW_CONFIRMATIONS.pop(user_id, None)
return self._fallback_format_tool_result("agendar_revisao", tool_result)
if not self._is_affirmative_message(message):
return None
if pending["expires_at"] < datetime.utcnow():
self.PENDING_REVIEW_CONFIRMATIONS.pop(user_id, None)
return None
try:
tool_result = await self.registry.execute(
"agendar_revisao",
pending["payload"],
user_id=user_id,
)
except HTTPException as exc:
self.PENDING_REVIEW_CONFIRMATIONS.pop(user_id, None)
return self._http_exception_detail(exc)
self.PENDING_REVIEW_CONFIRMATIONS.pop(user_id, None)
return self._fallback_format_tool_result("agendar_revisao", tool_result)
def _is_operational_query(self, message: str) -> bool:
text = message.lower()
keywords = (
"estoque",
"carro",
"carros",
"suv",
"sedan",
"hatch",
"pickup",
"financi",
"cpf",
"troca",
"revis",
"placa",
"cancelar pedido",
"comprar",
"compra",
"realizar pedido",
"pedido",
)
return any(k in text for k in keywords)
def _build_router_prompt(self, user_message: str, user_id: int | None) -> str:
user_context = f"Contexto de usuario autenticado: user_id={user_id}.\n" if user_id else ""
return (
"Voce e um assistente de concessionaria. "
"Sempre que a solicitacao depender de dados operacionais (estoque, validacao de cliente, "
"avaliacao de troca, agendamento de revisao, realizacao ou cancelamento de pedido), use a tool correta. "
"Se faltar parametro obrigatorio para a tool, responda em texto pedindo apenas o que falta.\n\n"
f"{user_context}"
f"Mensagem do usuario: {user_message}"
)
def _build_force_tool_prompt(self, user_message: str, user_id: int | None) -> str:
user_context = f"Contexto de usuario autenticado: user_id={user_id}.\n" if user_id else ""
return (
"Reavalie a mensagem e priorize chamar tool se houver intencao operacional. "
"Use texto apenas quando faltar dado obrigatorio.\n\n"
f"{user_context}"
f"Mensagem do usuario: {user_message}"
)
def _build_result_prompt(
self,
user_message: str,
user_id: int | None,
tool_name: str,
tool_result,
) -> str:
user_context = f"Contexto de usuario autenticado: user_id={user_id}.\n" if user_id else ""
return (
"Responda ao usuario de forma objetiva usando o resultado da tool abaixo. "
"Nao invente dados. Se a lista vier vazia, diga explicitamente que nao encontrou resultados.\n\n"
f"{user_context}"
f"Pergunta original: {user_message}\n"
f"Tool executada: {tool_name}\n"
f"Resultado da tool: {tool_result}"
)
def _http_exception_detail(self, exc: HTTPException) -> str:
detail = exc.detail
if isinstance(detail, str):
return detail
return "Nao foi possivel concluir a operacao solicitada."
def _fallback_format_tool_result(self, tool_name: str, tool_result) -> str:
if tool_name == "consultar_estoque":
if not tool_result:
return "Nao encontrei nenhum veiculo com os criterios informados."
return f"Encontrei {len(tool_result)} veiculo(s) com os criterios informados."
if tool_name == "cancelar_pedido" and isinstance(tool_result, dict):
numero = tool_result.get("numero_pedido", "N/A")
status = tool_result.get("status", "N/A")
return f"Pedido {numero} atualizado com status {status}."
if tool_name == "realizar_pedido" and isinstance(tool_result, dict):
numero = tool_result.get("numero_pedido", "N/A")
return f"Pedido {numero} criado com sucesso."
if tool_name == "agendar_revisao" and isinstance(tool_result, dict):
placa = tool_result.get("placa", "N/A")
data_hora = tool_result.get("data_hora", "N/A")
protocolo = tool_result.get("protocolo", "N/A")
valor = tool_result.get("valor_revisao")
if isinstance(valor, (int, float)):
return f"Revisao agendada para placa {placa} em {data_hora}. Valor estimado: R$ {valor:.2f}. Protocolo: {protocolo}."
return f"Revisao agendada para placa {placa} em {data_hora}. Protocolo: {protocolo}."
if tool_name == "validar_cliente_venda" and isinstance(tool_result, dict):
aprovado = tool_result.get("aprovado")
return "Cliente aprovado para financiamento." if aprovado else "Cliente nao aprovado para financiamento."
return "Operacao concluida com sucesso."