You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
1544 lines
63 KiB
Python
1544 lines
63 KiB
Python
import re
|
|
import json
|
|
import logging
|
|
import unicodedata
|
|
from datetime import datetime, timedelta
|
|
|
|
from fastapi import HTTPException
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.services.orchestrator_config import (
|
|
CANCEL_ORDER_REQUIRED_FIELDS,
|
|
DETERMINISTIC_RESPONSE_TOOLS,
|
|
LOW_VALUE_RESPONSES,
|
|
ORDER_REQUIRED_FIELDS,
|
|
PENDING_CANCEL_ORDER_DRAFT_TTL_MINUTES,
|
|
PENDING_ORDER_DRAFT_TTL_MINUTES,
|
|
PENDING_REVIEW_DRAFT_TTL_MINUTES,
|
|
PENDING_REVIEW_TTL_MINUTES,
|
|
REVIEW_REQUIRED_FIELDS,
|
|
USER_CONTEXT_TTL_MINUTES,
|
|
)
|
|
from app.services.llm_service import LLMService
|
|
from app.services.tool_registry import ToolRegistry
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class OrquestradorService:
|
|
USER_CONTEXTS: dict[int, dict] = {}
|
|
|
|
# Memoria temporaria de confirmacao quando a API sugere novo horario (conflito 409).
|
|
PENDING_REVIEW_CONFIRMATIONS: dict[int, dict] = {}
|
|
# Rascunho por usuario para juntar dados de revisao enviados em mensagens separadas.
|
|
PENDING_REVIEW_DRAFTS: dict[int, dict] = {}
|
|
PENDING_ORDER_DRAFTS: dict[int, dict] = {}
|
|
PENDING_CANCEL_ORDER_DRAFTS: dict[int, dict] = {}
|
|
|
|
def __init__(self, db: Session):
|
|
"""Inicializa servicos de LLM e registro de tools para a sessao atual."""
|
|
self.llm = LLMService()
|
|
self.registry = ToolRegistry(db)
|
|
|
|
async def handle_message(self, message: str, user_id: int | None = None) -> str:
|
|
"""Processa mensagem, executa tool quando necessario e retorna resposta final."""
|
|
async def finish(response: str, queue_notice: str | None = None) -> str:
|
|
composed = self._compose_order_aware_response(
|
|
response=response,
|
|
user_id=user_id,
|
|
queue_notice=queue_notice,
|
|
)
|
|
return await self._maybe_auto_advance_next_order(
|
|
base_response=composed,
|
|
user_id=user_id,
|
|
)
|
|
|
|
self._upsert_user_context(user_id=user_id)
|
|
|
|
routing_plan = await self._extract_routing_with_llm(message=message, user_id=user_id)
|
|
|
|
(
|
|
routing_message,
|
|
queue_notice,
|
|
queue_early_response,
|
|
) = self._prepare_message_for_single_order(
|
|
message=message,
|
|
user_id=user_id,
|
|
routing_plan=routing_plan,
|
|
)
|
|
if queue_early_response:
|
|
return await finish(queue_early_response, queue_notice=queue_notice)
|
|
|
|
extracted_entities = await self._extract_entities_with_llm(
|
|
message=routing_message,
|
|
user_id=user_id,
|
|
)
|
|
self._capture_generic_memory(
|
|
user_id=user_id,
|
|
llm_generic_fields=extracted_entities.get("generic_memory", {}),
|
|
)
|
|
|
|
domain_hint = self._domain_from_intents(extracted_entities.get("intents", {}))
|
|
context_switch_response = self._handle_context_switch(
|
|
message=routing_message,
|
|
user_id=user_id,
|
|
target_domain_hint=domain_hint,
|
|
)
|
|
if context_switch_response:
|
|
return await finish(context_switch_response, queue_notice=queue_notice)
|
|
|
|
self._update_active_domain(user_id=user_id, domain_hint=domain_hint)
|
|
|
|
review_management_response = await self._try_handle_review_management(
|
|
message=routing_message,
|
|
user_id=user_id,
|
|
intents=extracted_entities.get("intents", {}),
|
|
)
|
|
if review_management_response:
|
|
return await finish(review_management_response, queue_notice=queue_notice)
|
|
|
|
# 1) Se houver sugestao pendente de horario e o usuario confirmou ("pode/sim"),
|
|
# agenda direto no horario sugerido.
|
|
confirmation_response = await self._try_confirm_pending_review(
|
|
message=routing_message,
|
|
user_id=user_id,
|
|
extracted_review_fields=extracted_entities.get("review_fields", {}),
|
|
)
|
|
if confirmation_response:
|
|
return await finish(confirmation_response, queue_notice=queue_notice)
|
|
# 2) Fluxo de coleta incremental de dados da revisao (slot filling).
|
|
# Evita pedir tudo de novo quando o usuario responde em partes.
|
|
review_response = await self._try_collect_and_schedule_review(
|
|
message=routing_message,
|
|
user_id=user_id,
|
|
extracted_fields=extracted_entities.get("review_fields", {}),
|
|
intents=extracted_entities.get("intents", {}),
|
|
)
|
|
if review_response:
|
|
return await finish(review_response, queue_notice=queue_notice)
|
|
# 3) Fluxo de coleta incremental para cancelamento de pedido.
|
|
cancel_order_response = await self._try_collect_and_cancel_order(
|
|
message=routing_message,
|
|
user_id=user_id,
|
|
extracted_fields=extracted_entities.get("cancel_order_fields", {}),
|
|
intents=extracted_entities.get("intents", {}),
|
|
)
|
|
if cancel_order_response:
|
|
return await finish(cancel_order_response, queue_notice=queue_notice)
|
|
# 4) Fluxo de coleta incremental para realizacao de pedido.
|
|
order_response = await self._try_collect_and_create_order(
|
|
message=routing_message,
|
|
user_id=user_id,
|
|
extracted_fields=extracted_entities.get("order_fields", {}),
|
|
intents=extracted_entities.get("intents", {}),
|
|
)
|
|
if order_response:
|
|
return await finish(order_response, queue_notice=queue_notice)
|
|
|
|
tools = self.registry.get_tools()
|
|
|
|
llm_result = await self.llm.generate_response(
|
|
message=self._build_router_prompt(user_message=routing_message, user_id=user_id),
|
|
tools=tools,
|
|
)
|
|
|
|
if not llm_result["tool_call"] and self._has_operational_intent(extracted_entities):
|
|
llm_result = await self.llm.generate_response(
|
|
message=self._build_force_tool_prompt(user_message=routing_message, user_id=user_id),
|
|
tools=tools,
|
|
)
|
|
|
|
if llm_result["tool_call"]:
|
|
tool_name = llm_result["tool_call"]["name"]
|
|
arguments = llm_result["tool_call"]["arguments"]
|
|
|
|
try:
|
|
tool_result = await self.registry.execute(
|
|
tool_name,
|
|
arguments,
|
|
user_id=user_id,
|
|
)
|
|
except HTTPException as exc:
|
|
self._capture_review_confirmation_suggestion(
|
|
tool_name=tool_name,
|
|
arguments=arguments,
|
|
exc=exc,
|
|
user_id=user_id,
|
|
)
|
|
return await finish(self._http_exception_detail(exc), queue_notice=queue_notice)
|
|
|
|
if self._should_use_deterministic_response(tool_name):
|
|
return await finish(
|
|
self._fallback_format_tool_result(tool_name, tool_result),
|
|
queue_notice=queue_notice,
|
|
)
|
|
|
|
final_response = await self.llm.generate_response(
|
|
message=self._build_result_prompt(
|
|
user_message=routing_message,
|
|
user_id=user_id,
|
|
tool_name=tool_name,
|
|
tool_result=tool_result,
|
|
),
|
|
tools=[],
|
|
)
|
|
text = (final_response.get("response") or "").strip()
|
|
if self._is_low_value_response(text):
|
|
return await finish(
|
|
self._fallback_format_tool_result(tool_name, tool_result),
|
|
queue_notice=queue_notice,
|
|
)
|
|
|
|
return await finish(
|
|
text or self._fallback_format_tool_result(tool_name, tool_result),
|
|
queue_notice=queue_notice,
|
|
)
|
|
|
|
text = (llm_result.get("response") or "").strip()
|
|
if self._is_low_value_response(text):
|
|
return await finish(
|
|
"Entendi. Pode me dar mais detalhes para eu consultar corretamente?",
|
|
queue_notice=queue_notice,
|
|
)
|
|
return await finish(text, queue_notice=queue_notice)
|
|
|
|
def _reset_pending_review_states(self, user_id: int | None) -> None:
|
|
if user_id is None:
|
|
return
|
|
self.PENDING_REVIEW_DRAFTS.pop(user_id, None)
|
|
self.PENDING_REVIEW_CONFIRMATIONS.pop(user_id, None)
|
|
|
|
def _reset_pending_order_states(self, user_id: int | None) -> None:
|
|
if user_id is None:
|
|
return
|
|
self.PENDING_ORDER_DRAFTS.pop(user_id, None)
|
|
self.PENDING_CANCEL_ORDER_DRAFTS.pop(user_id, None)
|
|
|
|
# Nessa função é onde eu configuro a memória volátil do sistema
|
|
def _upsert_user_context(self, user_id: int | None) -> None:
|
|
if user_id is None:
|
|
return
|
|
now = datetime.utcnow()
|
|
context = self.USER_CONTEXTS.get(user_id)
|
|
if context and context["expires_at"] >= now:
|
|
context["expires_at"] = now + timedelta(minutes=USER_CONTEXT_TTL_MINUTES)
|
|
return
|
|
self.USER_CONTEXTS[user_id] = {
|
|
"active_domain": "general",
|
|
"generic_memory": {},
|
|
"shared_memory": {},
|
|
"order_queue": [],
|
|
"pending_switch": None,
|
|
"expires_at": now + timedelta(minutes=USER_CONTEXT_TTL_MINUTES),
|
|
}
|
|
|
|
def _get_user_context(self, user_id: int | None) -> dict | None:
|
|
if user_id is None:
|
|
return None
|
|
context = self.USER_CONTEXTS.get(user_id)
|
|
if not context:
|
|
return None
|
|
if context["expires_at"] < datetime.utcnow():
|
|
self.USER_CONTEXTS.pop(user_id, None)
|
|
return None
|
|
return context
|
|
|
|
def _extract_generic_memory_fields(self, llm_generic_fields: dict | None = None) -> dict:
|
|
extracted: dict = {}
|
|
llm_fields = llm_generic_fields or {}
|
|
|
|
normalized_plate = self._normalize_plate(llm_fields.get("placa"))
|
|
if normalized_plate:
|
|
extracted["placa"] = normalized_plate
|
|
|
|
normalized_cpf = self._normalize_cpf(llm_fields.get("cpf"))
|
|
if normalized_cpf:
|
|
extracted["cpf"] = normalized_cpf
|
|
|
|
normalized_budget = self._normalize_positive_number(llm_fields.get("orcamento_max"))
|
|
if normalized_budget:
|
|
extracted["orcamento_max"] = int(round(normalized_budget))
|
|
|
|
normalized_profile = self._normalize_vehicle_profile(llm_fields.get("perfil_veiculo"))
|
|
if normalized_profile:
|
|
extracted["perfil_veiculo"] = normalized_profile
|
|
|
|
return extracted
|
|
|
|
def _capture_generic_memory(
|
|
self,
|
|
user_id: int | None,
|
|
llm_generic_fields: dict | None = None,
|
|
) -> None:
|
|
context = self._get_user_context(user_id)
|
|
if not context:
|
|
return
|
|
fields = self._extract_generic_memory_fields(llm_generic_fields=llm_generic_fields)
|
|
if fields:
|
|
# "Memoria generica" e um dict acumulado por usuario.
|
|
# Campos novos entram e campos repetidos sobrescrevem valor antigo.
|
|
context["generic_memory"].update(fields)
|
|
context.setdefault("shared_memory", {}).update(fields)
|
|
|
|
def _new_tab_memory(self, user_id: int | None) -> dict:
|
|
context = self._get_user_context(user_id)
|
|
if not context:
|
|
return {}
|
|
shared = context.get("shared_memory", {})
|
|
if not isinstance(shared, dict):
|
|
return {}
|
|
return dict(shared)
|
|
|
|
def _empty_extraction_payload(self) -> dict:
|
|
return {
|
|
"generic_memory": {},
|
|
"review_fields": {},
|
|
"order_fields": {},
|
|
"cancel_order_fields": {},
|
|
"intents": {},
|
|
}
|
|
|
|
def _coerce_extraction_contract(self, payload) -> dict:
|
|
if not isinstance(payload, dict):
|
|
return self._empty_extraction_payload()
|
|
contract = self._empty_extraction_payload()
|
|
for key in contract:
|
|
value = payload.get(key)
|
|
contract[key] = value if isinstance(value, dict) else {}
|
|
if key not in payload:
|
|
logger.info("Extracao sem secao '%s'; usando vazio.", key)
|
|
return contract
|
|
|
|
async def _extract_routing_with_llm(self, message: str, user_id: int | None) -> dict:
|
|
prompt = (
|
|
"Analise a mensagem e retorne APENAS JSON valido para roteamento multiassunto.\n"
|
|
"Sem markdown e sem texto extra.\n\n"
|
|
"Formato:\n"
|
|
"{\n"
|
|
' "orders": [\n'
|
|
' {"domain": "review|sales|general", "message": "trecho literal do pedido"}\n'
|
|
" ]\n"
|
|
"}\n\n"
|
|
"Regras:\n"
|
|
"- Se houver mais de um pedido operacional, separe em itens distintos em ordem de aparicao.\n"
|
|
"- Se nao houver pedido operacional, use domain='general' com a mensagem inteira.\n"
|
|
"- Mantenha cada message curta e fiel ao texto do usuario.\n\n"
|
|
f"Contexto: user_id={user_id if user_id is not None else 'anonimo'}\n"
|
|
f"Mensagem do usuario: {message}"
|
|
)
|
|
default = {"orders": [{"domain": "general", "message": (message or "").strip()}]}
|
|
try:
|
|
result = await self.llm.generate_response(message=prompt, tools=[])
|
|
text = (result.get("response") or "").strip()
|
|
payload = self._parse_json_object(text)
|
|
if not isinstance(payload, dict):
|
|
logger.warning("Roteamento invalido (nao JSON objeto). user_id=%s", user_id)
|
|
return default
|
|
orders = payload.get("orders")
|
|
if not isinstance(orders, list):
|
|
return default
|
|
normalized: list[dict] = []
|
|
for item in orders:
|
|
if not isinstance(item, dict):
|
|
continue
|
|
domain = str(item.get("domain") or "general").strip().lower()
|
|
if domain not in {"review", "sales", "general"}:
|
|
domain = "general"
|
|
segment = str(item.get("message") or "").strip()
|
|
if segment:
|
|
normalized.append({"domain": domain, "message": segment})
|
|
if not normalized:
|
|
return default
|
|
return {"orders": normalized}
|
|
except Exception:
|
|
logger.exception("Falha ao rotear multiassunto com LLM. user_id=%s", user_id)
|
|
return default
|
|
|
|
async def _extract_entities_with_llm(self, message: str, user_id: int | None) -> dict:
|
|
user_context = f"user_id={user_id}" if user_id is not None else "user_id=anonimo"
|
|
prompt = (
|
|
"Extraia entidades da mensagem do usuario e retorne APENAS JSON valido.\n"
|
|
"Nao use markdown, nao adicione texto antes/depois, nao invente dados ausentes.\n"
|
|
"Se nao houver valor, use null ou lista vazia.\n\n"
|
|
"Formato obrigatorio:\n"
|
|
"{\n"
|
|
' "generic_memory": {\n'
|
|
' "placa": null,\n'
|
|
' "cpf": null,\n'
|
|
' "orcamento_max": null,\n'
|
|
' "perfil_veiculo": []\n'
|
|
" },\n"
|
|
' "review_fields": {\n'
|
|
' "placa": null,\n'
|
|
' "data_hora": null,\n'
|
|
' "modelo": null,\n'
|
|
' "ano": null,\n'
|
|
' "km": null,\n'
|
|
' "revisao_previa_concessionaria": null\n'
|
|
" },\n"
|
|
' "order_fields": {\n'
|
|
' "cpf": null,\n'
|
|
' "valor_veiculo": null\n'
|
|
" },\n"
|
|
' "cancel_order_fields": {\n'
|
|
' "numero_pedido": null,\n'
|
|
' "motivo": null\n'
|
|
" },\n"
|
|
' "intents": {\n'
|
|
' "review_schedule": false,\n'
|
|
' "review_list": false,\n'
|
|
' "order_create": false,\n'
|
|
' "order_cancel": false\n'
|
|
" }\n"
|
|
"}\n\n"
|
|
f"Contexto: {user_context}\n"
|
|
f"Mensagem do usuario: {message}"
|
|
)
|
|
|
|
default = self._empty_extraction_payload()
|
|
try:
|
|
result = await self.llm.generate_response(message=prompt, tools=[])
|
|
text = (result.get("response") or "").strip()
|
|
if not text:
|
|
logger.warning("Extracao vazia do LLM. user_id=%s", user_id)
|
|
return default
|
|
payload = self._parse_json_object(text)
|
|
if not isinstance(payload, dict):
|
|
logger.warning("Extracao invalida (nao JSON objeto). user_id=%s", user_id)
|
|
return default
|
|
coerced = self._coerce_extraction_contract(payload)
|
|
return {
|
|
"generic_memory": self._normalize_generic_fields(coerced.get("generic_memory")),
|
|
"review_fields": self._normalize_review_fields(coerced.get("review_fields")),
|
|
"order_fields": self._normalize_order_fields(coerced.get("order_fields")),
|
|
"cancel_order_fields": self._normalize_cancel_order_fields(coerced.get("cancel_order_fields")),
|
|
"intents": self._normalize_intents(coerced.get("intents")),
|
|
}
|
|
except Exception:
|
|
logger.exception("Falha ao extrair entidades com LLM. user_id=%s", user_id)
|
|
return default
|
|
|
|
def _parse_json_object(self, text: str):
|
|
candidate = (text or "").strip()
|
|
if not candidate:
|
|
return None
|
|
if candidate.startswith("```"):
|
|
candidate = re.sub(r"^```(?:json)?\s*", "", candidate, flags=re.IGNORECASE)
|
|
candidate = re.sub(r"\s*```$", "", candidate)
|
|
try:
|
|
return json.loads(candidate)
|
|
except json.JSONDecodeError:
|
|
match = re.search(r"\{.*\}", candidate, flags=re.DOTALL)
|
|
if not match:
|
|
logger.warning("Extracao sem JSON valido no texto retornado.")
|
|
return None
|
|
try:
|
|
return json.loads(match.group(0))
|
|
except json.JSONDecodeError:
|
|
logger.warning("Extracao com JSON invalido apos recorte.")
|
|
return None
|
|
|
|
def _normalize_plate(self, value) -> str | None:
|
|
text = str(value or "").strip().upper()
|
|
if not text:
|
|
return None
|
|
if re.fullmatch(r"[A-Z]{3}[0-9][A-Z0-9][0-9]{2}", text) or re.fullmatch(r"[A-Z]{3}[0-9]{4}", text):
|
|
return text
|
|
compact = re.sub(r"[^A-Z0-9]", "", text)
|
|
if re.fullmatch(r"[A-Z]{3}[0-9][A-Z0-9][0-9]{2}", compact) or re.fullmatch(r"[A-Z]{3}[0-9]{4}", compact):
|
|
return compact
|
|
return None
|
|
|
|
def _normalize_cpf(self, value) -> str | None:
|
|
digits = re.sub(r"\D", "", str(value or ""))
|
|
if len(digits) == 11:
|
|
return digits
|
|
return None
|
|
|
|
def _normalize_positive_number(self, value) -> float | None:
|
|
if value is None:
|
|
return None
|
|
if isinstance(value, (int, float)):
|
|
number = float(value)
|
|
return number if number > 0 else None
|
|
text = self._normalize_text(str(value))
|
|
text = text.replace("r$", "").strip()
|
|
multiplier = 1000 if "mil" in text else 1
|
|
text = text.replace("mil", "").strip()
|
|
digits = re.sub(r"[^0-9,.\s]", "", text)
|
|
if not digits:
|
|
return None
|
|
numeric = digits.replace(".", "").replace(" ", "").replace(",", ".")
|
|
try:
|
|
number = float(numeric) * multiplier
|
|
return number if number > 0 else None
|
|
except ValueError:
|
|
return None
|
|
|
|
def _normalize_vehicle_profile(self, value) -> list[str]:
|
|
if value is None:
|
|
return []
|
|
allowed = {"suv", "sedan", "hatch", "pickup"}
|
|
items = value if isinstance(value, list) else [value]
|
|
normalized: list[str] = []
|
|
for item in items:
|
|
marker = self._normalize_text(str(item)).strip()
|
|
if marker in allowed and marker not in normalized:
|
|
normalized.append(marker)
|
|
return normalized
|
|
|
|
def _normalize_bool(self, value) -> bool | None:
|
|
if isinstance(value, bool):
|
|
return value
|
|
lowered = self._normalize_text(str(value or "")).strip()
|
|
if lowered in {"sim", "true", "1", "yes"}:
|
|
return True
|
|
if lowered in {"nao", "false", "0", "no"}:
|
|
return False
|
|
return None
|
|
|
|
def _normalize_review_datetime_text(self, value) -> str | None:
|
|
text = str(value or "").strip()
|
|
if not text:
|
|
return None
|
|
|
|
# Mantem formatos absolutos que o handler ja sabe interpretar.
|
|
absolute_patterns = (
|
|
r"^\d{1,2}[/-]\d{1,2}[/-]\d{4}\s+(?:as\s+)?\d{1,2}:\d{2}(?::\d{2})?$",
|
|
r"^\d{4}[/-]\d{1,2}[/-]\d{1,2}\s+(?:as\s+)?\d{1,2}:\d{2}(?::\d{2})?$",
|
|
r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}(?::\d{2})?(?:Z|[+-]\d{2}:\d{2})?$",
|
|
)
|
|
if any(re.match(pattern, text, flags=re.IGNORECASE) for pattern in absolute_patterns):
|
|
return text
|
|
|
|
normalized = self._normalize_text(text)
|
|
day_offset = None
|
|
if "amanha" in normalized:
|
|
day_offset = 1
|
|
elif "hoje" in normalized:
|
|
day_offset = 0
|
|
if day_offset is None:
|
|
return text
|
|
|
|
time_match = re.search(r"\b([01]?\d|2[0-3]):([0-5]\d)\b", normalized)
|
|
if not time_match:
|
|
return text
|
|
|
|
hour = int(time_match.group(1))
|
|
minute = int(time_match.group(2))
|
|
target_date = datetime.now() + timedelta(days=day_offset)
|
|
return f"{target_date.strftime('%d/%m/%Y')} {hour:02d}:{minute:02d}"
|
|
|
|
def _normalize_generic_fields(self, data) -> dict:
|
|
if not isinstance(data, dict):
|
|
return {}
|
|
extracted: dict = {}
|
|
plate = self._normalize_plate(data.get("placa"))
|
|
if plate:
|
|
extracted["placa"] = plate
|
|
cpf = self._normalize_cpf(data.get("cpf"))
|
|
if cpf:
|
|
extracted["cpf"] = cpf
|
|
budget = self._normalize_positive_number(data.get("orcamento_max"))
|
|
if budget:
|
|
extracted["orcamento_max"] = int(round(budget))
|
|
profile = self._normalize_vehicle_profile(data.get("perfil_veiculo"))
|
|
if profile:
|
|
extracted["perfil_veiculo"] = profile
|
|
return extracted
|
|
|
|
def _normalize_review_fields(self, data) -> dict:
|
|
if not isinstance(data, dict):
|
|
return {}
|
|
extracted: dict = {}
|
|
plate = self._normalize_plate(data.get("placa"))
|
|
if plate:
|
|
extracted["placa"] = plate
|
|
date_time = self._normalize_review_datetime_text(data.get("data_hora"))
|
|
if date_time:
|
|
extracted["data_hora"] = date_time
|
|
model = str(data.get("modelo") or "").strip(" ,.;")
|
|
if model:
|
|
extracted["modelo"] = model.title()
|
|
year = self._normalize_positive_number(data.get("ano"))
|
|
if year:
|
|
year_int = int(round(year))
|
|
if 1900 <= year_int <= 2100:
|
|
extracted["ano"] = year_int
|
|
km = self._normalize_positive_number(data.get("km"))
|
|
if km:
|
|
extracted["km"] = int(round(km))
|
|
reviewed = self._normalize_bool(data.get("revisao_previa_concessionaria"))
|
|
if reviewed is not None:
|
|
extracted["revisao_previa_concessionaria"] = reviewed
|
|
return extracted
|
|
|
|
def _normalize_order_fields(self, data) -> dict:
|
|
if not isinstance(data, dict):
|
|
return {}
|
|
extracted: dict = {}
|
|
cpf = self._normalize_cpf(data.get("cpf"))
|
|
if cpf:
|
|
extracted["cpf"] = cpf
|
|
value = self._normalize_positive_number(data.get("valor_veiculo"))
|
|
if value:
|
|
extracted["valor_veiculo"] = round(value, 2)
|
|
return extracted
|
|
|
|
def _normalize_cancel_order_fields(self, data) -> dict:
|
|
if not isinstance(data, dict):
|
|
return {}
|
|
extracted: dict = {}
|
|
order_number = str(data.get("numero_pedido") or "").strip().upper()
|
|
if order_number and re.fullmatch(r"PED-[A-Z0-9\-]+", order_number):
|
|
extracted["numero_pedido"] = order_number
|
|
reason = str(data.get("motivo") or "").strip(" .;")
|
|
if reason:
|
|
extracted["motivo"] = reason
|
|
return extracted
|
|
|
|
def _normalize_intents(self, data) -> dict:
|
|
if not isinstance(data, dict):
|
|
data = {}
|
|
return {
|
|
"review_schedule": bool(self._normalize_bool(data.get("review_schedule"))),
|
|
"review_list": bool(self._normalize_bool(data.get("review_list"))),
|
|
"order_create": bool(self._normalize_bool(data.get("order_create"))),
|
|
"order_cancel": bool(self._normalize_bool(data.get("order_cancel"))),
|
|
}
|
|
|
|
def _has_operational_intent(self, extracted_entities: dict | None) -> bool:
|
|
if not isinstance(extracted_entities, dict):
|
|
return False
|
|
intents = self._normalize_intents(extracted_entities.get("intents"))
|
|
if any(intents.values()):
|
|
return True
|
|
return any(
|
|
bool(extracted_entities.get(key))
|
|
for key in ("review_fields", "order_fields", "cancel_order_fields")
|
|
)
|
|
|
|
def _try_prefill_review_fields_from_memory(self, user_id: int | None, payload: dict) -> None:
|
|
if user_id is None:
|
|
return
|
|
context = self._get_user_context(user_id)
|
|
if not context:
|
|
return
|
|
memory = context.get("generic_memory", {})
|
|
if payload.get("placa") is None:
|
|
plate = self._normalize_plate(memory.get("placa"))
|
|
if plate:
|
|
payload["placa"] = plate
|
|
|
|
def _queue_order(self, user_id: int | None, domain: str, order_message: str) -> None:
|
|
context = self._get_user_context(user_id)
|
|
if not context:
|
|
return
|
|
if domain == "general":
|
|
return
|
|
queue = context.setdefault("order_queue", [])
|
|
queue.append(
|
|
{
|
|
"domain": domain,
|
|
"message": (order_message or "").strip(),
|
|
"memory_seed": self._new_tab_memory(user_id=user_id),
|
|
"created_at": datetime.utcnow().isoformat(),
|
|
}
|
|
)
|
|
|
|
def _pop_next_order(self, user_id: int | None) -> dict | None:
|
|
context = self._get_user_context(user_id)
|
|
if not context:
|
|
return None
|
|
queue = context.setdefault("order_queue", [])
|
|
if not queue:
|
|
return None
|
|
return queue.pop(0)
|
|
|
|
def _prepare_message_for_single_order(
|
|
self,
|
|
message: str,
|
|
user_id: int | None,
|
|
routing_plan: dict | None = None,
|
|
) -> tuple[str, str | None, str | None]:
|
|
context = self._get_user_context(user_id)
|
|
if not context:
|
|
return message, None, None
|
|
|
|
queue_notice = None
|
|
active_domain = context.get("active_domain", "general")
|
|
|
|
orders_raw = (routing_plan or {}).get("orders") if isinstance(routing_plan, dict) else None
|
|
extracted_orders: list[dict] = []
|
|
if isinstance(orders_raw, list):
|
|
for item in orders_raw:
|
|
if not isinstance(item, dict):
|
|
continue
|
|
domain = str(item.get("domain") or "general").strip().lower()
|
|
if domain not in {"review", "sales", "general"}:
|
|
domain = "general"
|
|
segment = str(item.get("message") or "").strip()
|
|
if segment:
|
|
extracted_orders.append({"domain": domain, "message": segment})
|
|
if not extracted_orders:
|
|
extracted_orders = [{"domain": "general", "message": (message or "").strip()}]
|
|
|
|
if len(extracted_orders) <= 1:
|
|
inferred = extracted_orders[0]["domain"]
|
|
if (
|
|
inferred != "general"
|
|
and inferred != active_domain
|
|
and self._has_open_flow(user_id=user_id, domain=active_domain)
|
|
):
|
|
self._queue_order(user_id=user_id, domain=inferred, order_message=message)
|
|
return (
|
|
message,
|
|
None,
|
|
self._render_open_flow_prompt(user_id=user_id, domain=active_domain),
|
|
)
|
|
return message, None, None
|
|
|
|
if self._has_open_flow(user_id=user_id, domain=active_domain):
|
|
for queued in extracted_orders:
|
|
if queued["domain"] != active_domain:
|
|
self._queue_order(user_id=user_id, domain=queued["domain"], order_message=queued["message"])
|
|
return (
|
|
message,
|
|
None,
|
|
self._render_open_flow_prompt(user_id=user_id, domain=active_domain),
|
|
)
|
|
|
|
first = extracted_orders[0]
|
|
for queued in extracted_orders[1:]:
|
|
self._queue_order(user_id=user_id, domain=queued["domain"], order_message=queued["message"])
|
|
context["active_domain"] = first["domain"]
|
|
|
|
queue_notice = None
|
|
return first["message"], queue_notice, None
|
|
|
|
def _compose_order_aware_response(self, response: str, user_id: int | None, queue_notice: str | None = None) -> str:
|
|
lines = []
|
|
if queue_notice:
|
|
lines.append(queue_notice)
|
|
lines.append(response)
|
|
return "\n".join(lines)
|
|
|
|
def _render_open_flow_prompt(self, user_id: int | None, domain: str) -> str:
|
|
if domain == "review" and user_id is not None:
|
|
draft = self.PENDING_REVIEW_DRAFTS.get(user_id)
|
|
if draft:
|
|
missing = [field for field in REVIEW_REQUIRED_FIELDS if field not in draft.get("payload", {})]
|
|
if missing:
|
|
return self._render_missing_review_fields_prompt(missing)
|
|
|
|
pending = self.PENDING_REVIEW_CONFIRMATIONS.get(user_id)
|
|
if pending:
|
|
return "Antes de mudar de assunto, me confirme se podemos concluir seu agendamento de revisao."
|
|
if domain == "sales" and user_id is not None:
|
|
draft = self.PENDING_ORDER_DRAFTS.get(user_id)
|
|
if draft:
|
|
missing = [field for field in ORDER_REQUIRED_FIELDS if field not in draft.get("payload", {})]
|
|
if missing:
|
|
return self._render_missing_order_fields_prompt(missing)
|
|
cancel_draft = self.PENDING_CANCEL_ORDER_DRAFTS.get(user_id)
|
|
if cancel_draft:
|
|
missing = [field for field in CANCEL_ORDER_REQUIRED_FIELDS if field not in cancel_draft.get("payload", {})]
|
|
if missing:
|
|
return self._render_missing_cancel_order_fields_prompt(missing)
|
|
|
|
return "Vamos concluir este assunto primeiro e ja sigo com o proximo em seguida."
|
|
|
|
def _build_next_order_transition(self, domain: str) -> str:
|
|
if domain == "sales":
|
|
return "Agora, sobre a compra do veiculo:"
|
|
if domain == "review":
|
|
return "Agora, sobre o agendamento da revisao:"
|
|
return "Agora, sobre o proximo assunto:"
|
|
|
|
async def _maybe_auto_advance_next_order(self, base_response: str, user_id: int | None) -> str:
|
|
context = self._get_user_context(user_id)
|
|
if not context:
|
|
return base_response
|
|
|
|
if context.get("pending_switch"):
|
|
return base_response
|
|
|
|
active_domain = context.get("active_domain", "general")
|
|
if self._has_open_flow(user_id=user_id, domain=active_domain):
|
|
return base_response
|
|
|
|
next_order = self._pop_next_order(user_id=user_id)
|
|
if not next_order:
|
|
return base_response
|
|
|
|
context["active_domain"] = next_order["domain"]
|
|
context["generic_memory"] = dict(next_order.get("memory_seed") or self._new_tab_memory(user_id=user_id))
|
|
context["pending_switch"] = None
|
|
next_response = await self.handle_message(next_order["message"], user_id=user_id)
|
|
transition = self._build_next_order_transition(next_order["domain"])
|
|
return f"{base_response}\n\n{transition}\n{next_response}"
|
|
|
|
def _domain_from_intents(self, intents: dict | None) -> str:
|
|
normalized = self._normalize_intents(intents)
|
|
review_score = int(normalized.get("review_schedule", False)) + int(normalized.get("review_list", False))
|
|
sales_score = int(normalized.get("order_create", False)) + int(normalized.get("order_cancel", False))
|
|
if review_score > sales_score and review_score > 0:
|
|
return "review"
|
|
if sales_score > review_score and sales_score > 0:
|
|
return "sales"
|
|
return "general"
|
|
|
|
def _is_context_switch_confirmation(self, message: str) -> bool:
|
|
return self._is_affirmative_message(message) or self._is_negative_message(message)
|
|
|
|
def _has_open_flow(self, user_id: int | None, domain: str) -> bool:
|
|
if user_id is None:
|
|
return False
|
|
if domain == "review":
|
|
return bool(
|
|
self.PENDING_REVIEW_DRAFTS.get(user_id)
|
|
or self.PENDING_REVIEW_CONFIRMATIONS.get(user_id)
|
|
)
|
|
if domain == "sales":
|
|
return bool(
|
|
self.PENDING_ORDER_DRAFTS.get(user_id)
|
|
or self.PENDING_CANCEL_ORDER_DRAFTS.get(user_id)
|
|
)
|
|
return False
|
|
|
|
def _apply_domain_switch(self, user_id: int | None, target_domain: str) -> None:
|
|
context = self._get_user_context(user_id)
|
|
if not context:
|
|
return
|
|
previous_domain = context.get("active_domain", "general")
|
|
if previous_domain == "review":
|
|
self._reset_pending_review_states(user_id=user_id)
|
|
if previous_domain == "sales":
|
|
self._reset_pending_order_states(user_id=user_id)
|
|
context["active_domain"] = target_domain
|
|
context["generic_memory"] = self._new_tab_memory(user_id=user_id)
|
|
context["pending_switch"] = None
|
|
|
|
def _handle_context_switch(
|
|
self,
|
|
message: str,
|
|
user_id: int | None,
|
|
target_domain_hint: str = "general",
|
|
) -> str | None:
|
|
context = self._get_user_context(user_id)
|
|
if not context:
|
|
return None
|
|
|
|
pending_switch = context.get("pending_switch")
|
|
if pending_switch:
|
|
if pending_switch["expires_at"] < datetime.utcnow():
|
|
context["pending_switch"] = None
|
|
elif self._is_context_switch_confirmation(message):
|
|
if self._is_affirmative_message(message):
|
|
target_domain = pending_switch["target_domain"]
|
|
self._apply_domain_switch(user_id=user_id, target_domain=target_domain)
|
|
return self._render_context_switched_message(target_domain=target_domain)
|
|
context["pending_switch"] = None
|
|
return "Perfeito, vamos continuar no fluxo atual."
|
|
|
|
current_domain = context.get("active_domain", "general")
|
|
target_domain = target_domain_hint
|
|
if target_domain == "general" or target_domain == current_domain:
|
|
return None
|
|
if not self._has_open_flow(user_id=user_id, domain=current_domain):
|
|
return None
|
|
|
|
context["pending_switch"] = {
|
|
"source_domain": current_domain,
|
|
"target_domain": target_domain,
|
|
"expires_at": datetime.utcnow() + timedelta(minutes=15),
|
|
}
|
|
return self._render_context_switch_confirmation(
|
|
source_domain=current_domain,
|
|
target_domain=target_domain,
|
|
)
|
|
|
|
def _update_active_domain(self, user_id: int | None, domain_hint: str = "general") -> None:
|
|
context = self._get_user_context(user_id)
|
|
if not context:
|
|
return
|
|
detected = domain_hint
|
|
if detected != "general":
|
|
context["active_domain"] = detected
|
|
|
|
def _domain_label(self, domain: str) -> str:
|
|
labels = {
|
|
"review": "agendamento de revisao",
|
|
"sales": "compra de veiculo",
|
|
"general": "atendimento geral",
|
|
}
|
|
return labels.get(domain, "atendimento")
|
|
|
|
def _render_context_switch_confirmation(self, source_domain: str, target_domain: str) -> str:
|
|
return (
|
|
f"Entendi que voce quer sair de {self._domain_label(source_domain)} "
|
|
f"e ir para {self._domain_label(target_domain)}. Tem certeza?"
|
|
)
|
|
|
|
def _render_context_switched_message(self, target_domain: str) -> str:
|
|
return f"Certo, contexto anterior encerrado. Vamos seguir com {self._domain_label(target_domain)}."
|
|
|
|
def _build_context_summary(self, user_id: int | None) -> str:
|
|
context = self._get_user_context(user_id)
|
|
if not context:
|
|
return "Contexto de conversa: sem contexto ativo."
|
|
|
|
domain = context.get("active_domain", "general")
|
|
memory = context.get("generic_memory", {})
|
|
order_queue = context.get("order_queue", [])
|
|
summary = [f"Contexto de conversa ativo: {self._domain_label(domain)}."]
|
|
if memory:
|
|
summary.append(f"Memoria generica temporaria: {memory}.")
|
|
if order_queue:
|
|
summary.append(f"Fila de pedidos pendentes: {len(order_queue)}.")
|
|
return " ".join(summary)
|
|
|
|
def _should_use_deterministic_response(self, tool_name: str) -> bool:
|
|
return tool_name in DETERMINISTIC_RESPONSE_TOOLS
|
|
|
|
def _normalize_text(self, text: str) -> str:
|
|
normalized = unicodedata.normalize("NFKD", text or "")
|
|
ascii_text = normalized.encode("ascii", "ignore").decode("ascii")
|
|
return ascii_text.lower()
|
|
|
|
def _is_low_value_response(self, text: str) -> bool:
|
|
return text.strip().lower() in LOW_VALUE_RESPONSES
|
|
|
|
async def _try_handle_review_management(
|
|
self,
|
|
message: str,
|
|
user_id: int | None,
|
|
intents: dict | None = None,
|
|
) -> str | None:
|
|
if user_id is None:
|
|
return None
|
|
normalized_intents = self._normalize_intents(intents)
|
|
if not normalized_intents.get("review_list", False):
|
|
return None
|
|
|
|
# Se o usuario pediu listagem, encerramos coleta pendente para nao competir com o fluxo.
|
|
self._reset_pending_review_states(user_id=user_id)
|
|
try:
|
|
tool_result = await self.registry.execute(
|
|
"listar_agendamentos_revisao",
|
|
{"limite": 20},
|
|
user_id=user_id,
|
|
)
|
|
except HTTPException as exc:
|
|
return self._http_exception_detail(exc)
|
|
return self._fallback_format_tool_result("listar_agendamentos_revisao", tool_result)
|
|
|
|
def _render_missing_review_fields_prompt(self, missing_fields: list[str]) -> str:
|
|
labels = {
|
|
"placa": "a placa do veiculo",
|
|
"data_hora": "a data e hora desejada para a revisao",
|
|
"modelo": "o modelo do veiculo",
|
|
"ano": "o ano do veiculo",
|
|
"km": "a quilometragem atual (km)",
|
|
"revisao_previa_concessionaria": "se ja fez revisao na concessionaria (sim/nao)",
|
|
}
|
|
itens = [f"- {labels[field]}" for field in missing_fields]
|
|
return "Para agendar sua revisao, preciso dos dados abaixo:\n" + "\n".join(itens)
|
|
|
|
def _is_valid_cpf(self, cpf: str) -> bool:
|
|
digits = re.sub(r"\D", "", cpf or "")
|
|
if len(digits) != 11:
|
|
return False
|
|
if digits == digits[0] * 11:
|
|
return False
|
|
|
|
numbers = [int(d) for d in digits]
|
|
|
|
sum_first = sum(n * w for n, w in zip(numbers[:9], range(10, 1, -1)))
|
|
first_digit = 11 - (sum_first % 11)
|
|
first_digit = 0 if first_digit >= 10 else first_digit
|
|
if first_digit != numbers[9]:
|
|
return False
|
|
|
|
sum_second = sum(n * w for n, w in zip(numbers[:10], range(11, 1, -1)))
|
|
second_digit = 11 - (sum_second % 11)
|
|
second_digit = 0 if second_digit >= 10 else second_digit
|
|
return second_digit == numbers[10]
|
|
|
|
def _try_prefill_order_value_from_memory(self, user_id: int | None, payload: dict) -> None:
|
|
# So preenche quando o usuario ainda nao informou valor explicitamente no fluxo atual.
|
|
if user_id is None or payload.get("valor_veiculo") is not None:
|
|
return
|
|
|
|
context = self._get_user_context(user_id)
|
|
if not context:
|
|
return
|
|
memory = context.get("generic_memory", {})
|
|
budget = memory.get("orcamento_max")
|
|
if isinstance(budget, (int, float)) and budget > 0:
|
|
# Reaproveita o orcamento capturado anteriormente como valor base do pedido.
|
|
payload["valor_veiculo"] = float(budget)
|
|
|
|
def _render_missing_order_fields_prompt(self, missing_fields: list[str]) -> str:
|
|
labels = {
|
|
"cpf": "o CPF do cliente",
|
|
"valor_veiculo": "o valor do veiculo (R$)",
|
|
}
|
|
itens = [f"- {labels[field]}" for field in missing_fields]
|
|
return "Para realizar o pedido, preciso dos dados abaixo:\n" + "\n".join(itens)
|
|
|
|
def _render_missing_cancel_order_fields_prompt(self, missing_fields: list[str]) -> str:
|
|
labels = {
|
|
"numero_pedido": "o numero do pedido (ex.: PED-20260305123456-ABC123)",
|
|
"motivo": "o motivo do cancelamento",
|
|
}
|
|
itens = [f"- {labels[field]}" for field in missing_fields]
|
|
return "Para cancelar o pedido, preciso dos dados abaixo:\n" + "\n".join(itens)
|
|
|
|
# Em vez de tentar entender tudo de uma vez, o bot mantem um "estado" do que ja sabe e vai perguntando apenas o que falta (os "slots" vazios) ate que a tarefa possa ser completada.
|
|
async def _try_collect_and_schedule_review(
|
|
self,
|
|
message: str,
|
|
user_id: int | None,
|
|
extracted_fields: dict | None = None,
|
|
intents: dict | None = None,
|
|
) -> str | None:
|
|
if user_id is None:
|
|
return None
|
|
|
|
normalized_intents = self._normalize_intents(intents)
|
|
has_intent = normalized_intents.get("review_schedule", False)
|
|
has_management_intent = normalized_intents.get("review_list", False)
|
|
|
|
# Nao inicia slot-filling quando a intencao atual nao e de agendamento.
|
|
if has_management_intent:
|
|
# Se o usuario mudou para gerenciamento de revisao, encerra
|
|
# qualquer coleta pendente de novo agendamento.
|
|
self.PENDING_REVIEW_DRAFTS.pop(user_id, None)
|
|
return None
|
|
|
|
# Reaproveita rascunho anterior do usuario, se ainda estiver valido.
|
|
draft = self.PENDING_REVIEW_DRAFTS.get(user_id)
|
|
if draft and draft["expires_at"] < datetime.utcnow():
|
|
self.PENDING_REVIEW_DRAFTS.pop(user_id, None)
|
|
draft = None
|
|
|
|
extracted = self._normalize_review_fields(extracted_fields)
|
|
|
|
# Se houver rascunho de revisao, mas o usuario mudou para outra
|
|
# intencao operacional (ex.: compra/estoque), descarta o rascunho.
|
|
if (
|
|
draft
|
|
and not has_intent
|
|
and (
|
|
normalized_intents.get("order_create", False)
|
|
or normalized_intents.get("order_cancel", False)
|
|
)
|
|
and not extracted
|
|
):
|
|
self.PENDING_REVIEW_DRAFTS.pop(user_id, None)
|
|
return None
|
|
|
|
# Sem intencao de revisao e sem rascunho aberto: nao interfere no fluxo normal.
|
|
if not has_intent and draft is None:
|
|
return None
|
|
|
|
if draft is None:
|
|
draft = {"payload": {}, "expires_at": datetime.utcnow() + timedelta(minutes=PENDING_REVIEW_DRAFT_TTL_MINUTES)}
|
|
|
|
# Merge incremental: apenas atualiza os campos detectados na mensagem atual.
|
|
draft["payload"].update(extracted)
|
|
self._try_prefill_review_fields_from_memory(user_id=user_id, payload=draft["payload"])
|
|
# Se o usuario responder apenas "sim/nao" no follow-up, preenche o slot booleano.
|
|
if (
|
|
"revisao_previa_concessionaria" not in draft["payload"]
|
|
and draft["payload"]
|
|
and not extracted
|
|
):
|
|
if self._is_affirmative_message(message):
|
|
draft["payload"]["revisao_previa_concessionaria"] = True
|
|
elif self._is_negative_message(message):
|
|
draft["payload"]["revisao_previa_concessionaria"] = False
|
|
draft["expires_at"] = datetime.utcnow() + timedelta(minutes=PENDING_REVIEW_DRAFT_TTL_MINUTES)
|
|
self.PENDING_REVIEW_DRAFTS[user_id] = draft
|
|
|
|
# Enquanto faltar campo obrigatorio, responde de forma deterministica
|
|
# (sem depender do LLM para lembrar contexto).
|
|
missing = [field for field in REVIEW_REQUIRED_FIELDS if field not in draft["payload"]]
|
|
if missing:
|
|
return self._render_missing_review_fields_prompt(missing)
|
|
|
|
try:
|
|
# Com payload completo, executa a tool de agendamento.
|
|
tool_result = await self.registry.execute(
|
|
"agendar_revisao",
|
|
draft["payload"],
|
|
user_id=user_id,
|
|
)
|
|
except HTTPException as exc:
|
|
# Se houver conflito com sugestao de horario, salva para confirmar com "pode/sim".
|
|
self._capture_review_confirmation_suggestion(
|
|
tool_name="agendar_revisao",
|
|
arguments=draft["payload"],
|
|
exc=exc,
|
|
user_id=user_id,
|
|
)
|
|
return self._http_exception_detail(exc)
|
|
finally:
|
|
# Limpa o rascunho apos tentativa final para evitar estado sujo.
|
|
self.PENDING_REVIEW_DRAFTS.pop(user_id, None)
|
|
|
|
return self._fallback_format_tool_result("agendar_revisao", tool_result)
|
|
|
|
async def _try_collect_and_create_order(
|
|
self,
|
|
message: str,
|
|
user_id: int | None,
|
|
extracted_fields: dict | None = None,
|
|
intents: dict | None = None,
|
|
) -> str | None:
|
|
if user_id is None:
|
|
return None
|
|
|
|
normalized_intents = self._normalize_intents(intents)
|
|
draft = self.PENDING_ORDER_DRAFTS.get(user_id)
|
|
if draft and draft["expires_at"] < datetime.utcnow():
|
|
self.PENDING_ORDER_DRAFTS.pop(user_id, None)
|
|
draft = None
|
|
|
|
extracted = self._normalize_order_fields(extracted_fields)
|
|
has_intent = normalized_intents.get("order_create", False)
|
|
|
|
if (
|
|
draft
|
|
and not has_intent
|
|
and (
|
|
normalized_intents.get("review_schedule", False)
|
|
or normalized_intents.get("review_list", False)
|
|
or normalized_intents.get("order_cancel", False)
|
|
)
|
|
and not extracted
|
|
):
|
|
self.PENDING_ORDER_DRAFTS.pop(user_id, None)
|
|
return None
|
|
|
|
if not has_intent and draft is None:
|
|
return None
|
|
|
|
if draft is None:
|
|
draft = {
|
|
"payload": {},
|
|
"expires_at": datetime.utcnow() + timedelta(minutes=PENDING_ORDER_DRAFT_TTL_MINUTES),
|
|
}
|
|
|
|
draft["payload"].update(extracted)
|
|
self._try_prefill_order_value_from_memory(user_id=user_id, payload=draft["payload"])
|
|
|
|
cpf_value = draft["payload"].get("cpf")
|
|
if cpf_value and not self._is_valid_cpf(str(cpf_value)):
|
|
draft["payload"].pop("cpf", None)
|
|
self.PENDING_ORDER_DRAFTS[user_id] = draft
|
|
return "Para seguir com o pedido, preciso de um CPF valido com 11 digitos."
|
|
|
|
valor = draft["payload"].get("valor_veiculo")
|
|
if valor is not None:
|
|
try:
|
|
parsed = float(valor)
|
|
if parsed <= 0:
|
|
draft["payload"].pop("valor_veiculo", None)
|
|
else:
|
|
draft["payload"]["valor_veiculo"] = round(parsed, 2)
|
|
except (TypeError, ValueError):
|
|
draft["payload"].pop("valor_veiculo", None)
|
|
|
|
draft["expires_at"] = datetime.utcnow() + timedelta(minutes=PENDING_ORDER_DRAFT_TTL_MINUTES)
|
|
self.PENDING_ORDER_DRAFTS[user_id] = draft
|
|
|
|
missing = [field for field in ORDER_REQUIRED_FIELDS if field not in draft["payload"]]
|
|
if missing:
|
|
return self._render_missing_order_fields_prompt(missing)
|
|
|
|
try:
|
|
tool_result = await self.registry.execute(
|
|
"realizar_pedido",
|
|
draft["payload"],
|
|
user_id=user_id,
|
|
)
|
|
except HTTPException as exc:
|
|
return self._http_exception_detail(exc)
|
|
finally:
|
|
self.PENDING_ORDER_DRAFTS.pop(user_id, None)
|
|
|
|
return self._fallback_format_tool_result("realizar_pedido", tool_result)
|
|
|
|
async def _try_collect_and_cancel_order(
|
|
self,
|
|
message: str,
|
|
user_id: int | None,
|
|
extracted_fields: dict | None = None,
|
|
intents: dict | None = None,
|
|
) -> str | None:
|
|
if user_id is None:
|
|
return None
|
|
|
|
normalized_intents = self._normalize_intents(intents)
|
|
draft = self.PENDING_CANCEL_ORDER_DRAFTS.get(user_id)
|
|
if draft and draft["expires_at"] < datetime.utcnow():
|
|
self.PENDING_CANCEL_ORDER_DRAFTS.pop(user_id, None)
|
|
draft = None
|
|
|
|
extracted = self._normalize_cancel_order_fields(extracted_fields)
|
|
has_intent = normalized_intents.get("order_cancel", False)
|
|
|
|
if (
|
|
draft
|
|
and not has_intent
|
|
and (
|
|
normalized_intents.get("review_schedule", False)
|
|
or normalized_intents.get("review_list", False)
|
|
or normalized_intents.get("order_create", False)
|
|
)
|
|
and not extracted
|
|
):
|
|
self.PENDING_CANCEL_ORDER_DRAFTS.pop(user_id, None)
|
|
return None
|
|
|
|
if not has_intent and draft is None:
|
|
return None
|
|
|
|
if draft is None:
|
|
draft = {
|
|
"payload": {},
|
|
"expires_at": datetime.utcnow() + timedelta(minutes=PENDING_CANCEL_ORDER_DRAFT_TTL_MINUTES),
|
|
}
|
|
|
|
if (
|
|
"motivo" not in extracted
|
|
and draft["payload"].get("numero_pedido")
|
|
and not has_intent
|
|
):
|
|
free_text = (message or "").strip()
|
|
if free_text and len(free_text) >= 4:
|
|
extracted["motivo"] = free_text
|
|
|
|
draft["payload"].update(extracted)
|
|
draft["expires_at"] = datetime.utcnow() + timedelta(minutes=PENDING_CANCEL_ORDER_DRAFT_TTL_MINUTES)
|
|
self.PENDING_CANCEL_ORDER_DRAFTS[user_id] = draft
|
|
|
|
missing = [field for field in CANCEL_ORDER_REQUIRED_FIELDS if field not in draft["payload"]]
|
|
if missing:
|
|
return self._render_missing_cancel_order_fields_prompt(missing)
|
|
|
|
try:
|
|
tool_result = await self.registry.execute(
|
|
"cancelar_pedido",
|
|
draft["payload"],
|
|
user_id=user_id,
|
|
)
|
|
except HTTPException as exc:
|
|
return self._http_exception_detail(exc)
|
|
finally:
|
|
self.PENDING_CANCEL_ORDER_DRAFTS.pop(user_id, None)
|
|
|
|
return self._fallback_format_tool_result("cancelar_pedido", tool_result)
|
|
|
|
def _is_affirmative_message(self, text: str) -> bool:
|
|
normalized = self._normalize_text(text).strip()
|
|
normalized = re.sub(r"[.!?,;:]+$", "", normalized)
|
|
return normalized in {"sim", "pode", "ok", "confirmo", "aceito", "fechado", "pode sim", "tenho", "tenho sim"}
|
|
|
|
def _is_negative_message(self, text: str) -> bool:
|
|
normalized = self._normalize_text(text).strip()
|
|
normalized = re.sub(r"[.!?,;:]+$", "", normalized)
|
|
return (
|
|
normalized in {"nao", "nao quero", "prefiro outro", "outro horario"}
|
|
or normalized.startswith("nao")
|
|
)
|
|
|
|
def _extract_time_only(self, text: str) -> str | None:
|
|
match = re.search(r"\b([01]?\d|2[0-3]):([0-5]\d)\b", text or "")
|
|
if not match:
|
|
return None
|
|
return f"{int(match.group(1)):02d}:{match.group(2)}"
|
|
|
|
def _merge_date_with_time(self, base_iso: str, new_time_hhmm: str) -> str | None:
|
|
try:
|
|
base_dt = datetime.fromisoformat((base_iso or "").replace("Z", "+00:00"))
|
|
except ValueError:
|
|
return None
|
|
try:
|
|
hour_text, minute_text = new_time_hhmm.split(":")
|
|
merged = base_dt.replace(hour=int(hour_text), minute=int(minute_text), second=0, microsecond=0)
|
|
return merged.isoformat()
|
|
except Exception:
|
|
return None
|
|
|
|
def _capture_review_confirmation_suggestion(
|
|
self,
|
|
tool_name: str,
|
|
arguments: dict,
|
|
exc: HTTPException,
|
|
user_id: int | None,
|
|
) -> None:
|
|
if tool_name != "agendar_revisao" or user_id is None or exc.status_code != 409:
|
|
return
|
|
detail = exc.detail if isinstance(exc.detail, str) else ""
|
|
match = re.search(r"ISO:\s*([^)]+)\)", detail)
|
|
if not match:
|
|
return
|
|
suggested_iso = match.group(1).strip()
|
|
payload = dict(arguments or {})
|
|
if not payload.get("placa"):
|
|
return
|
|
payload["data_hora"] = suggested_iso
|
|
self.PENDING_REVIEW_CONFIRMATIONS[user_id] = {
|
|
"payload": payload,
|
|
"expires_at": datetime.utcnow() + timedelta(minutes=PENDING_REVIEW_TTL_MINUTES),
|
|
}
|
|
|
|
async def _try_confirm_pending_review(
|
|
self,
|
|
message: str,
|
|
user_id: int | None,
|
|
extracted_review_fields: dict | None = None,
|
|
) -> str | None:
|
|
if user_id is None:
|
|
return None
|
|
pending = self.PENDING_REVIEW_CONFIRMATIONS.get(user_id)
|
|
if not pending:
|
|
return None
|
|
|
|
time_only = self._extract_time_only(message)
|
|
if self._is_negative_message(message) or time_only:
|
|
# Se o usuario recusar a sugestao e informar novo horario, reaproveita
|
|
# o payload pendente com a nova data/hora.
|
|
extracted = self._normalize_review_fields(extracted_review_fields)
|
|
new_data_hora = extracted.get("data_hora")
|
|
if not new_data_hora and time_only:
|
|
new_data_hora = self._merge_date_with_time(pending["payload"].get("data_hora", ""), time_only)
|
|
if not new_data_hora:
|
|
self.PENDING_REVIEW_CONFIRMATIONS.pop(user_id, None)
|
|
return "Sem problema. Me informe a nova data e hora desejada para a revisao."
|
|
|
|
payload = dict(pending["payload"])
|
|
payload["data_hora"] = new_data_hora
|
|
try:
|
|
tool_result = await self.registry.execute(
|
|
"agendar_revisao",
|
|
payload,
|
|
user_id=user_id,
|
|
)
|
|
except HTTPException as exc:
|
|
self.PENDING_REVIEW_CONFIRMATIONS.pop(user_id, None)
|
|
self._capture_review_confirmation_suggestion(
|
|
tool_name="agendar_revisao",
|
|
arguments=payload,
|
|
exc=exc,
|
|
user_id=user_id,
|
|
)
|
|
return self._http_exception_detail(exc)
|
|
|
|
self.PENDING_REVIEW_CONFIRMATIONS.pop(user_id, None)
|
|
return self._fallback_format_tool_result("agendar_revisao", tool_result)
|
|
|
|
if not self._is_affirmative_message(message):
|
|
return None
|
|
if pending["expires_at"] < datetime.utcnow():
|
|
self.PENDING_REVIEW_CONFIRMATIONS.pop(user_id, None)
|
|
return None
|
|
|
|
try:
|
|
tool_result = await self.registry.execute(
|
|
"agendar_revisao",
|
|
pending["payload"],
|
|
user_id=user_id,
|
|
)
|
|
except HTTPException as exc:
|
|
self.PENDING_REVIEW_CONFIRMATIONS.pop(user_id, None)
|
|
return self._http_exception_detail(exc)
|
|
|
|
self.PENDING_REVIEW_CONFIRMATIONS.pop(user_id, None)
|
|
return self._fallback_format_tool_result("agendar_revisao", tool_result)
|
|
|
|
def _build_router_prompt(self, user_message: str, user_id: int | None) -> str:
|
|
user_context = f"Contexto de usuario autenticado: user_id={user_id}.\n" if user_id else ""
|
|
conversation_context = self._build_context_summary(user_id=user_id)
|
|
return (
|
|
"Voce e um assistente de concessionaria. "
|
|
"Sempre que a solicitacao depender de dados operacionais (estoque, validacao de cliente, "
|
|
"avaliacao de troca, agendamento de revisao, realizacao ou cancelamento de pedido), use a tool correta. "
|
|
"Se faltar parametro obrigatorio para a tool, responda em texto pedindo apenas o que falta.\n\n"
|
|
f"{user_context}"
|
|
f"{conversation_context}\n"
|
|
f"Mensagem do usuario: {user_message}"
|
|
)
|
|
|
|
def _build_force_tool_prompt(self, user_message: str, user_id: int | None) -> str:
|
|
user_context = f"Contexto de usuario autenticado: user_id={user_id}.\n" if user_id else ""
|
|
conversation_context = self._build_context_summary(user_id=user_id)
|
|
return (
|
|
"Reavalie a mensagem e priorize chamar tool se houver intencao operacional. "
|
|
"Use texto apenas quando faltar dado obrigatorio.\n\n"
|
|
f"{user_context}"
|
|
f"{conversation_context}\n"
|
|
f"Mensagem do usuario: {user_message}"
|
|
)
|
|
|
|
def _build_result_prompt(
|
|
self,
|
|
user_message: str,
|
|
user_id: int | None,
|
|
tool_name: str,
|
|
tool_result,
|
|
) -> str:
|
|
user_context = f"Contexto de usuario autenticado: user_id={user_id}.\n" if user_id else ""
|
|
conversation_context = self._build_context_summary(user_id=user_id)
|
|
return (
|
|
"Responda ao usuario de forma objetiva usando o resultado da tool abaixo. "
|
|
"Nao invente dados. Se a lista vier vazia, diga explicitamente que nao encontrou resultados. "
|
|
"Retorne texto puro sem markdown, sem asteriscos, sem emojis e com linhas curtas.\n\n"
|
|
f"{user_context}"
|
|
f"{conversation_context}\n"
|
|
f"Pergunta original: {user_message}\n"
|
|
f"Tool executada: {tool_name}\n"
|
|
f"Resultado da tool: {tool_result}"
|
|
)
|
|
|
|
def _http_exception_detail(self, exc: HTTPException) -> str:
|
|
detail = exc.detail
|
|
if isinstance(detail, str):
|
|
return detail
|
|
return "Nao foi possivel concluir a operacao solicitada."
|
|
|
|
def _format_datetime_for_chat(self, value: str) -> str:
|
|
try:
|
|
dt = datetime.fromisoformat((value or "").replace("Z", "+00:00"))
|
|
return dt.strftime("%d/%m/%Y %H:%M")
|
|
except Exception:
|
|
return value or "N/A"
|
|
|
|
def _format_currency_br(self, value) -> str:
|
|
try:
|
|
number = float(value)
|
|
formatted = f"{number:,.2f}".replace(",", "X").replace(".", ",").replace("X", ".")
|
|
return f"R$ {formatted}"
|
|
except Exception:
|
|
return "N/A"
|
|
|
|
def _fallback_format_tool_result(self, tool_name: str, tool_result) -> str:
|
|
if tool_name == "consultar_estoque" and isinstance(tool_result, list):
|
|
if not tool_result:
|
|
return "Nao encontrei nenhum veiculo com os criterios informados."
|
|
linhas = [f"Encontrei {len(tool_result)} veiculo(s):"]
|
|
for idx, item in enumerate(tool_result[:10], start=1):
|
|
modelo = item.get("modelo", "N/A")
|
|
categoria = item.get("categoria", "N/A")
|
|
preco = self._format_currency_br(item.get("preco"))
|
|
linhas.append(f"{idx}. {modelo} ({categoria}) - {preco}")
|
|
restantes = len(tool_result) - 10
|
|
if restantes > 0:
|
|
linhas.append(f"... e mais {restantes} veiculo(s).")
|
|
return "\n".join(linhas)
|
|
|
|
if tool_name == "cancelar_pedido" and isinstance(tool_result, dict):
|
|
numero = tool_result.get("numero_pedido", "N/A")
|
|
status = tool_result.get("status", "N/A")
|
|
motivo = tool_result.get("motivo")
|
|
linhas = [f"Pedido {numero} atualizado.", f"Status: {status}"]
|
|
if motivo:
|
|
linhas.append(f"Motivo: {motivo}")
|
|
return "\n".join(linhas)
|
|
|
|
if tool_name == "realizar_pedido" and isinstance(tool_result, dict):
|
|
numero = tool_result.get("numero_pedido", "N/A")
|
|
valor = self._format_currency_br(tool_result.get("valor_veiculo"))
|
|
return f"Pedido criado com sucesso.\nNumero: {numero}\nValor: {valor}"
|
|
|
|
if tool_name == "agendar_revisao" and isinstance(tool_result, dict):
|
|
placa = tool_result.get("placa", "N/A")
|
|
data_hora = self._format_datetime_for_chat(tool_result.get("data_hora", "N/A"))
|
|
protocolo = tool_result.get("protocolo", "N/A")
|
|
valor = tool_result.get("valor_revisao")
|
|
if isinstance(valor, (int, float)):
|
|
return (
|
|
"Revisao agendada com sucesso.\n"
|
|
f"Protocolo: {protocolo}\n"
|
|
f"Placa: {placa}\n"
|
|
f"Data/Hora: {data_hora}\n"
|
|
f"Valor estimado: {self._format_currency_br(valor)}"
|
|
)
|
|
return (
|
|
"Revisao agendada com sucesso.\n"
|
|
f"Protocolo: {protocolo}\n"
|
|
f"Placa: {placa}\n"
|
|
f"Data/Hora: {data_hora}"
|
|
)
|
|
|
|
if tool_name == "listar_agendamentos_revisao" and isinstance(tool_result, list):
|
|
if not tool_result:
|
|
return "Nao encontrei agendamentos de revisao para sua conta."
|
|
linhas = [f"Voce tem {len(tool_result)} agendamento(s):"]
|
|
for idx, item in enumerate(tool_result[:12], start=1):
|
|
protocolo = item.get("protocolo", "N/A")
|
|
placa = item.get("placa", "N/A")
|
|
data_hora = self._format_datetime_for_chat(item.get("data_hora", "N/A"))
|
|
status = item.get("status", "N/A")
|
|
linhas.append(f"{idx}) Protocolo: {protocolo}")
|
|
linhas.append(f"Placa: {placa}")
|
|
linhas.append(f"Data/Hora: {data_hora} | Status: {status}")
|
|
if idx < min(len(tool_result), 12):
|
|
linhas.append("")
|
|
restantes = len(tool_result) - 12
|
|
if restantes > 0:
|
|
if linhas and linhas[-1] != "":
|
|
linhas.append("")
|
|
linhas.append(f"... e mais {restantes} agendamento(s).")
|
|
return "\n".join(linhas)
|
|
|
|
if tool_name == "cancelar_agendamento_revisao" and isinstance(tool_result, dict):
|
|
protocolo = tool_result.get("protocolo", "N/A")
|
|
status = tool_result.get("status", "N/A")
|
|
placa = tool_result.get("placa", "N/A")
|
|
data_hora = self._format_datetime_for_chat(tool_result.get("data_hora", "N/A"))
|
|
return (
|
|
"Agendamento atualizado.\n"
|
|
f"Protocolo: {protocolo}\n"
|
|
f"Placa: {placa}\n"
|
|
f"Data/Hora: {data_hora}\n"
|
|
f"Status: {status}"
|
|
)
|
|
|
|
if tool_name == "editar_data_revisao" and isinstance(tool_result, dict):
|
|
protocolo = tool_result.get("protocolo", "N/A")
|
|
placa = tool_result.get("placa", "N/A")
|
|
data_hora = self._format_datetime_for_chat(tool_result.get("data_hora", "N/A"))
|
|
status = tool_result.get("status", "N/A")
|
|
return (
|
|
"Agendamento remarcado com sucesso.\n"
|
|
f"Protocolo: {protocolo}\n"
|
|
f"Placa: {placa}\n"
|
|
f"Nova data/hora: {data_hora}\n"
|
|
f"Status: {status}"
|
|
)
|
|
|
|
if tool_name == "validar_cliente_venda" and isinstance(tool_result, dict):
|
|
aprovado = tool_result.get("aprovado")
|
|
limite = self._format_currency_br(tool_result.get("limite_credito"))
|
|
score = tool_result.get("score", "N/A")
|
|
cpf = tool_result.get("cpf", "N/A")
|
|
if aprovado:
|
|
return (
|
|
"Cliente aprovado para financiamento.\n"
|
|
f"CPF: {cpf}\n"
|
|
f"Score: {score}\n"
|
|
f"Limite: {limite}"
|
|
)
|
|
return (
|
|
"Cliente nao aprovado para financiamento.\n"
|
|
f"CPF: {cpf}\n"
|
|
f"Score: {score}\n"
|
|
f"Limite: {limite}"
|
|
)
|
|
|
|
return "Operacao concluida com sucesso."
|