You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
orquestrador/app/services/flows/review_flow.py

303 lines
13 KiB
Python

import re
from datetime import datetime, timedelta
from fastapi import HTTPException
from app.services.orchestration.orchestrator_config import (
LAST_REVIEW_PACKAGE_TTL_MINUTES,
PENDING_REVIEW_DRAFT_TTL_MINUTES,
REVIEW_REQUIRED_FIELDS,
)
class ReviewFlowMixin:
async def _try_handle_review_management(
self,
message: str,
user_id: int | None,
extracted_fields: dict | None = None,
intents: dict | None = None,
) -> str | None:
if user_id is None:
return None
normalized_intents = self._normalize_intents(intents)
draft = self.state.get_entry("pending_review_management_drafts", user_id, expire=True)
has_list_intent = normalized_intents.get("review_list", False)
has_cancel_intent = normalized_intents.get("review_cancel", False)
has_reschedule_intent = normalized_intents.get("review_reschedule", False)
if has_list_intent:
self._reset_pending_review_states(user_id=user_id)
try:
tool_result = await self.registry.execute(
"listar_agendamentos_revisao",
{"limite": 20},
user_id=user_id,
)
except HTTPException as exc:
return self._http_exception_detail(exc)
return self._fallback_format_tool_result("listar_agendamentos_revisao", tool_result)
if not has_cancel_intent and not has_reschedule_intent and draft is None:
return None
if draft is None:
action = "reschedule" if has_reschedule_intent else "cancel"
draft = {
"action": action,
"payload": {},
"expires_at": datetime.utcnow() + timedelta(minutes=PENDING_REVIEW_DRAFT_TTL_MINUTES),
}
else:
if has_reschedule_intent:
draft["action"] = "reschedule"
elif has_cancel_intent:
draft["action"] = "cancel"
extracted = self._normalize_review_management_fields(extracted_fields)
if "protocolo" not in extracted:
inferred_protocol = self._extract_review_protocol_from_text(message)
if inferred_protocol:
extracted["protocolo"] = inferred_protocol
action = draft.get("action", "cancel")
if (
action == "cancel"
and "motivo" not in extracted
and draft["payload"].get("protocolo")
and not has_cancel_intent
):
free_text = str(message or "").strip()
if free_text and len(free_text) >= 4 and not self._is_affirmative_message(free_text):
extracted["motivo"] = free_text
draft["payload"].update(extracted)
draft["expires_at"] = datetime.utcnow() + timedelta(minutes=PENDING_REVIEW_DRAFT_TTL_MINUTES)
self.state.set_entry("pending_review_management_drafts", user_id, draft)
if action == "reschedule":
missing = [field for field in ("protocolo", "nova_data_hora") if field not in draft["payload"]]
if missing:
return self._render_missing_review_reschedule_fields_prompt(missing)
try:
tool_result = await self.registry.execute(
"editar_data_revisao",
{
"protocolo": draft["payload"]["protocolo"],
"nova_data_hora": draft["payload"]["nova_data_hora"],
},
user_id=user_id,
)
except HTTPException as exc:
return self._http_exception_detail(exc)
self.state.pop_entry("pending_review_management_drafts", user_id)
return self._fallback_format_tool_result("editar_data_revisao", tool_result)
missing = [field for field in ("protocolo",) if field not in draft["payload"]]
if missing:
return self._render_missing_review_cancel_fields_prompt(missing)
try:
tool_result = await self.registry.execute(
"cancelar_agendamento_revisao",
{
"protocolo": draft["payload"]["protocolo"],
"motivo": draft["payload"].get("motivo"),
},
user_id=user_id,
)
except HTTPException as exc:
return self._http_exception_detail(exc)
self.state.pop_entry("pending_review_management_drafts", user_id)
return self._fallback_format_tool_result("cancelar_agendamento_revisao", tool_result)
def _render_missing_review_fields_prompt(self, missing_fields: list[str]) -> str:
labels = {
"placa": "a placa do veiculo",
"data_hora": "a data e hora desejada para a revisao",
"modelo": "o modelo do veiculo",
"ano": "o ano do veiculo",
"km": "a quilometragem atual (km)",
"revisao_previa_concessionaria": "se ja fez revisao na concessionaria (sim/nao)",
}
itens = [f"- {labels[field]}" for field in missing_fields]
return "Para agendar sua revisao, preciso dos dados abaixo:\n" + "\n".join(itens)
def _render_missing_review_cancel_fields_prompt(self, missing_fields: list[str]) -> str:
labels = {
"protocolo": "o protocolo da revisao (ex.: REV-20260310-ABC12345)",
}
itens = [f"- {labels[field]}" for field in missing_fields]
return "Para cancelar o agendamento de revisao, preciso dos dados abaixo:\n" + "\n".join(itens)
def _render_missing_review_reschedule_fields_prompt(self, missing_fields: list[str]) -> str:
labels = {
"protocolo": "o protocolo da revisao (ex.: REV-20260310-ABC12345)",
"nova_data_hora": "a nova data e hora desejada para a revisao",
}
itens = [f"- {labels[field]}" for field in missing_fields]
return "Para remarcar sua revisao, preciso dos dados abaixo:\n" + "\n".join(itens)
def _render_review_reuse_question(self) -> str:
return (
"Deseja usar os mesmos dados do ultimo veiculo e informar so a data/hora da revisao? "
"(sim/nao)"
)
def _store_last_review_package(self, user_id: int | None, payload: dict | None) -> None:
if user_id is None or not isinstance(payload, dict):
return
package = {
"placa": payload.get("placa"),
"modelo": payload.get("modelo"),
"ano": payload.get("ano"),
"km": payload.get("km"),
"revisao_previa_concessionaria": payload.get("revisao_previa_concessionaria"),
}
sanitized = {k: v for k, v in package.items() if v is not None}
required = {"placa", "modelo", "ano", "km", "revisao_previa_concessionaria"}
if not required.issubset(sanitized.keys()):
return
self.state.set_entry(
"last_review_packages",
user_id,
{
"payload": sanitized,
"expires_at": datetime.utcnow() + timedelta(minutes=LAST_REVIEW_PACKAGE_TTL_MINUTES),
},
)
def _get_last_review_package(self, user_id: int | None) -> dict | None:
if user_id is None:
return None
cached = self.state.get_entry("last_review_packages", user_id, expire=True)
if not cached:
return None
payload = cached.get("payload")
return dict(payload) if isinstance(payload, dict) else None
async def _try_collect_and_schedule_review(
self,
message: str,
user_id: int | None,
extracted_fields: dict | None = None,
intents: dict | None = None,
) -> str | None:
if user_id is None:
return None
normalized_intents = self._normalize_intents(intents)
has_intent = normalized_intents.get("review_schedule", False)
has_management_intent = (
normalized_intents.get("review_list", False)
or normalized_intents.get("review_cancel", False)
or normalized_intents.get("review_reschedule", False)
)
if has_management_intent:
self.state.pop_entry("pending_review_drafts", user_id)
self.state.pop_entry("pending_review_reuse_confirmations", user_id)
return None
draft = self.state.get_entry("pending_review_drafts", user_id, expire=True)
extracted = self._normalize_review_fields(extracted_fields)
pending_reuse = self.state.get_entry("pending_review_reuse_confirmations", user_id, expire=True)
if pending_reuse:
should_reuse = False
if self._is_negative_message(message):
self.state.pop_entry("pending_review_reuse_confirmations", user_id)
pending_reuse = None
elif self._is_affirmative_message(message) or "data_hora" in extracted:
should_reuse = True
else:
return self._render_review_reuse_question()
if should_reuse:
seed_payload = dict(pending_reuse.get("payload") or {})
if draft is None:
draft = {
"payload": seed_payload,
"expires_at": datetime.utcnow() + timedelta(minutes=PENDING_REVIEW_DRAFT_TTL_MINUTES),
}
else:
for key, value in seed_payload.items():
draft["payload"].setdefault(key, value)
self.state.pop_entry("pending_review_reuse_confirmations", user_id)
if "data_hora" not in extracted:
self.state.set_entry("pending_review_drafts", user_id, draft)
return "Perfeito. Me informe apenas a data e hora desejada para a revisao."
if has_intent and draft is None and not extracted:
last_package = self._get_last_review_package(user_id=user_id)
if last_package:
self.state.set_entry(
"pending_review_reuse_confirmations",
user_id,
{
"payload": last_package,
"expires_at": datetime.utcnow() + timedelta(minutes=PENDING_REVIEW_DRAFT_TTL_MINUTES),
},
)
return self._render_review_reuse_question()
if (
draft
and not has_intent
and (
normalized_intents.get("order_create", False)
or normalized_intents.get("order_cancel", False)
)
and not extracted
):
self.state.pop_entry("pending_review_drafts", user_id)
return None
if not has_intent and draft is None:
return None
if draft is None:
draft = {
"payload": {},
"expires_at": datetime.utcnow() + timedelta(minutes=PENDING_REVIEW_DRAFT_TTL_MINUTES),
}
draft["payload"].update(extracted)
self._try_prefill_review_fields_from_memory(user_id=user_id, payload=draft["payload"])
if (
"revisao_previa_concessionaria" not in draft["payload"]
and draft["payload"]
and not extracted
):
if self._is_affirmative_message(message):
draft["payload"]["revisao_previa_concessionaria"] = True
elif self._is_negative_message(message):
draft["payload"]["revisao_previa_concessionaria"] = False
draft["expires_at"] = datetime.utcnow() + timedelta(minutes=PENDING_REVIEW_DRAFT_TTL_MINUTES)
self.state.set_entry("pending_review_drafts", user_id, draft)
missing = [field for field in REVIEW_REQUIRED_FIELDS if field not in draft["payload"]]
if missing:
return self._render_missing_review_fields_prompt(missing)
try:
tool_result = await self.registry.execute(
"agendar_revisao",
draft["payload"],
user_id=user_id,
)
except HTTPException as exc:
self._capture_review_confirmation_suggestion(
tool_name="agendar_revisao",
arguments=draft["payload"],
exc=exc,
user_id=user_id,
)
if self.state.get_entry("pending_review_confirmations", user_id, expire=True):
self.state.pop_entry("pending_review_drafts", user_id)
return self._http_exception_detail(exc)
self.state.pop_entry("pending_review_drafts", user_id)
self._store_last_review_package(user_id=user_id, payload=draft["payload"])
return self._fallback_format_tool_result("agendar_revisao", tool_result)