You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
1416 lines
63 KiB
Python
1416 lines
63 KiB
Python
import re
|
|
from datetime import datetime, timedelta
|
|
from app.core.time_utils import utc_now
|
|
from typing import TYPE_CHECKING
|
|
|
|
from app.services.orchestration.orchestrator_config import (
|
|
CANCEL_ORDER_REQUIRED_FIELDS,
|
|
ORDER_REQUIRED_FIELDS,
|
|
PENDING_ORDER_SELECTION_TTL_MINUTES,
|
|
REVIEW_REQUIRED_FIELDS,
|
|
)
|
|
|
|
if TYPE_CHECKING:
|
|
from app.services.orchestration.orquestrador_service import OrquestradorService
|
|
|
|
GENERIC_MEMORY_SUMMARY_FIELDS = (
|
|
"placa",
|
|
"cpf",
|
|
"orcamento_max",
|
|
"perfil_veiculo",
|
|
)
|
|
|
|
REVIEW_SUMMARY_FIELDS = (
|
|
"placa",
|
|
"data_hora",
|
|
"data_hora_base",
|
|
"modelo",
|
|
"ano",
|
|
"km",
|
|
"revisao_previa_concessionaria",
|
|
)
|
|
|
|
REVIEW_MANAGEMENT_SUMMARY_FIELDS = (
|
|
"protocolo",
|
|
"nova_data_hora",
|
|
"motivo",
|
|
)
|
|
|
|
ORDER_SUMMARY_FIELDS = (
|
|
"cpf",
|
|
"vehicle_id",
|
|
"modelo_veiculo",
|
|
"valor_veiculo",
|
|
)
|
|
|
|
CANCEL_ORDER_SUMMARY_FIELDS = (
|
|
"numero_pedido",
|
|
"motivo",
|
|
)
|
|
|
|
CONTEXT_FIELD_LABELS = {
|
|
"placa": "placa",
|
|
"cpf": "cpf",
|
|
"orcamento_max": "orcamento",
|
|
"perfil_veiculo": "perfil",
|
|
"data_hora": "data/hora",
|
|
"data_hora_base": "data base",
|
|
"modelo": "modelo",
|
|
"ano": "ano",
|
|
"km": "km",
|
|
"revisao_previa_concessionaria": "revisao previa na concessionaria",
|
|
"protocolo": "protocolo",
|
|
"nova_data_hora": "nova data/hora",
|
|
"motivo": "motivo",
|
|
"vehicle_id": "vehicle_id",
|
|
"modelo_veiculo": "modelo_veiculo",
|
|
"valor_veiculo": "valor_veiculo",
|
|
"numero_pedido": "numero_pedido",
|
|
"rental_vehicle_id": "rental_vehicle_id",
|
|
"data_inicio": "data de inicio",
|
|
"data_fim_prevista": "data fim prevista",
|
|
"valor_diaria_max": "valor maximo da diaria",
|
|
}
|
|
|
|
ACTIVE_TASK_LABELS = {
|
|
"review_schedule": "agendamento de revisao",
|
|
"review_management": "gestao de revisao",
|
|
"order_create": "criacao de pedido",
|
|
"order_cancel": "cancelamento de pedido",
|
|
"rental_create": "abertura de locacao",
|
|
}
|
|
|
|
ACTIONABLE_ORDER_DOMAINS = {"review", "sales", "rental"}
|
|
|
|
# essa classe é responsável por controlar qual o assunto está ativo na conversa, se existe fluxo aberto, se o usuário mandou dois pedidos ao mesmo tempo...
|
|
class ConversationPolicy:
|
|
def __init__(self, service: "OrquestradorService"):
|
|
self.service = service
|
|
|
|
def _save_context(self, user_id: int | None, context: dict | None) -> None:
|
|
if user_id is None or not isinstance(context, dict):
|
|
return
|
|
self.service._save_user_context(user_id=user_id, context=context)
|
|
|
|
def _decision_action(self, turn_decision: dict | None) -> str:
|
|
return str((turn_decision or {}).get("action") or "").strip().lower()
|
|
|
|
def _decision_intent(self, turn_decision: dict | None) -> str:
|
|
return str((turn_decision or {}).get("intent") or "").strip().lower()
|
|
|
|
def _decision_domain(self, turn_decision: dict | None) -> str:
|
|
return str((turn_decision or {}).get("domain") or "").strip().lower()
|
|
|
|
def _decision_selection_index(self, turn_decision: dict | None) -> int | None:
|
|
value = (turn_decision or {}).get("selection_index")
|
|
return value if isinstance(value, int) and value >= 0 else None
|
|
|
|
# Essa função serve para reaproveitar informações já informadas antes, evitando pedir novamente ao usuário.
|
|
def try_prefill_review_fields_from_memory(self, user_id: int | None, payload: dict) -> None:
|
|
if user_id is None:
|
|
return
|
|
context = self.service._get_user_context(user_id)
|
|
if not context:
|
|
return
|
|
memory = context.get("generic_memory", {})
|
|
if payload.get("placa") is None:
|
|
plate = self.service.normalizer.normalize_plate(memory.get("placa"))
|
|
if plate:
|
|
payload["placa"] = plate
|
|
|
|
# Essa função coloca um pedido na fila
|
|
def queue_order(self, user_id: int | None, domain: str, order_message: str) -> None:
|
|
self.queue_order_with_memory_seed(
|
|
user_id=user_id,
|
|
domain=domain,
|
|
order_message=order_message,
|
|
memory_seed=self.service._new_tab_memory(user_id=user_id),
|
|
)
|
|
|
|
# Enfileira um próximo assunto para ser tratado depois, preservando dados úteis daquele pedido
|
|
def queue_order_with_memory_seed(
|
|
self,
|
|
user_id: int | None,
|
|
domain: str,
|
|
order_message: str,
|
|
memory_seed: dict | None = None,
|
|
) -> bool:
|
|
context = self.service._get_user_context(user_id)
|
|
if not context or domain == "general":
|
|
return False
|
|
queue = context.setdefault("order_queue", [])
|
|
queue.append(
|
|
{
|
|
"domain": domain,
|
|
"message": self.build_order_execution_message(domain, order_message),
|
|
"memory_seed": dict(memory_seed or self.service._new_tab_memory(user_id=user_id)),
|
|
"created_at": utc_now().isoformat(),
|
|
}
|
|
)
|
|
self._save_context(user_id=user_id, context=context)
|
|
return True
|
|
|
|
|
|
# Transforma as entidades extraídas de um pedido em uma memória temporária pronta para usar quando esse pedido for processado.
|
|
def build_order_memory_seed(self, user_id: int | None, order: dict | None = None) -> dict:
|
|
seed = dict(self.service._new_tab_memory(user_id=user_id))
|
|
if not isinstance(order, dict):
|
|
return seed
|
|
|
|
entities = order.get("entities")
|
|
if not isinstance(entities, dict):
|
|
return seed
|
|
|
|
generic_memory = self.service.normalizer.normalize_generic_fields(entities.get("generic_memory"))
|
|
if generic_memory:
|
|
seed.update(generic_memory)
|
|
|
|
review_fields = self.service.normalizer.normalize_review_fields(entities.get("review_fields"))
|
|
if review_fields.get("placa") and not seed.get("placa"):
|
|
seed["placa"] = review_fields["placa"]
|
|
|
|
order_fields = self.service.normalizer.normalize_order_fields(entities.get("order_fields"))
|
|
if order_fields.get("cpf") and not seed.get("cpf"):
|
|
seed["cpf"] = order_fields["cpf"]
|
|
if order_fields.get("modelo_veiculo") and not seed.get("modelo_veiculo"):
|
|
seed["modelo_veiculo"] = order_fields["modelo_veiculo"]
|
|
|
|
return seed
|
|
|
|
|
|
# Pega o próximo assunto pendente do usuário.
|
|
def pop_next_order(self, user_id: int | None) -> dict | None:
|
|
context = self.service._get_user_context(user_id)
|
|
if not context:
|
|
return None
|
|
queue = context.setdefault("order_queue", [])
|
|
if not queue:
|
|
return None
|
|
popped = queue.pop(0)
|
|
self._save_context(user_id=user_id, context=context)
|
|
return popped
|
|
|
|
|
|
|
|
# Decide qual pedido deve ser processado agora, qual vai para fila, e se o usuário precisa escolher entre dois pedidos.
|
|
def prepare_message_for_single_order(
|
|
self,
|
|
message: str,
|
|
user_id: int | None,
|
|
routing_plan: dict | None = None,
|
|
) -> tuple[str, str | None, str | None]:
|
|
context = self.service._get_user_context(user_id)
|
|
if not context:
|
|
return message, None, None
|
|
|
|
queue_notice = None
|
|
active_domain = context.get("active_domain", "general")
|
|
|
|
orders_raw = (routing_plan or {}).get("orders") if isinstance(routing_plan, dict) else None
|
|
extracted_orders: list[dict] = []
|
|
if isinstance(orders_raw, list):
|
|
for item in orders_raw:
|
|
if not isinstance(item, dict):
|
|
continue
|
|
domain = str(item.get("domain") or "general").strip().lower()
|
|
if domain not in ACTIONABLE_ORDER_DOMAINS | {"general"}:
|
|
domain = "general"
|
|
segment = str(item.get("message") or "").strip()
|
|
if segment:
|
|
extracted_orders.append(
|
|
{
|
|
"domain": domain,
|
|
"message": segment,
|
|
"entities": self.service._coerce_extraction_contract(item.get("entities")),
|
|
}
|
|
)
|
|
if not extracted_orders:
|
|
extracted_orders = [{"domain": "general", "message": (message or "").strip()}]
|
|
extracted_orders = self.augment_actionable_orders_from_message(
|
|
message=message,
|
|
extracted_orders=extracted_orders,
|
|
)
|
|
|
|
actionable_orders = [order for order in extracted_orders if order["domain"] in ACTIONABLE_ORDER_DOMAINS]
|
|
|
|
if (
|
|
len(actionable_orders) >= 2
|
|
and not self.has_open_flow(user_id=user_id, domain=active_domain)
|
|
):
|
|
self.store_pending_order_selection(user_id=user_id, orders=actionable_orders)
|
|
return message, None, self.render_order_selection_prompt(actionable_orders)
|
|
|
|
if len(extracted_orders) <= 1:
|
|
inferred = extracted_orders[0]["domain"]
|
|
if (
|
|
inferred != "general"
|
|
and inferred != active_domain
|
|
and self.has_open_flow(user_id=user_id, domain=active_domain)
|
|
):
|
|
# Para uma troca explicita de dominio com fluxo aberto,
|
|
# deixa a confirmacao de context switch acontecer no ponto
|
|
# normal da policy em vez de enfileirar cedo demais.
|
|
return message, None, None
|
|
return message, None, None
|
|
|
|
if self.has_open_flow(user_id=user_id, domain=active_domain):
|
|
queued_count = 0
|
|
for queued in actionable_orders:
|
|
if queued["domain"] != active_domain:
|
|
queued_count += int(
|
|
self.queue_order_with_memory_seed(
|
|
user_id=user_id,
|
|
domain=queued["domain"],
|
|
order_message=queued["message"],
|
|
memory_seed=self.build_order_memory_seed(user_id=user_id, order=queued),
|
|
)
|
|
)
|
|
queue_hint = self.render_queue_notice(queued_count)
|
|
prompt = self.render_open_flow_prompt(user_id=user_id, domain=active_domain)
|
|
return message, None, f"{prompt}\n{queue_hint}" if queue_hint else prompt
|
|
|
|
first = actionable_orders[0] if actionable_orders else extracted_orders[0]
|
|
queued_count = 0
|
|
for queued in actionable_orders:
|
|
if queued is first:
|
|
continue
|
|
queued_count += int(
|
|
self.queue_order_with_memory_seed(
|
|
user_id=user_id,
|
|
domain=queued["domain"],
|
|
order_message=queued["message"],
|
|
memory_seed=self.build_order_memory_seed(user_id=user_id, order=queued),
|
|
)
|
|
)
|
|
context["active_domain"] = first["domain"]
|
|
context["generic_memory"] = self.build_order_memory_seed(user_id=user_id, order=first)
|
|
self._save_context(user_id=user_id, context=context)
|
|
|
|
queue_notice = self.render_queue_notice(queued_count)
|
|
return first["message"], queue_notice, None
|
|
|
|
|
|
# Serve para concatenar mensagens auxiliares com a resposta principal.
|
|
def compose_order_aware_response(self, response: str, queue_notice: str | None = None) -> str:
|
|
lines = []
|
|
if queue_notice:
|
|
lines.append(queue_notice)
|
|
lines.append(response)
|
|
return "\n".join(lines)
|
|
|
|
|
|
# Armazena uma “decisão pendente” de qual pedido começar.
|
|
def store_pending_order_selection(self, user_id: int | None, orders: list[dict]) -> None:
|
|
context = self.service._get_user_context(user_id)
|
|
if not context:
|
|
return
|
|
context["pending_order_selection"] = {
|
|
"orders": [
|
|
{
|
|
"domain": order["domain"],
|
|
"message": order["message"],
|
|
"seed_message": self.build_order_execution_message(order["domain"], order["message"]),
|
|
"memory_seed": self.build_order_memory_seed(user_id=user_id, order=order),
|
|
}
|
|
for order in orders
|
|
],
|
|
"expires_at": utc_now() + timedelta(minutes=PENDING_ORDER_SELECTION_TTL_MINUTES),
|
|
}
|
|
self._save_context(user_id=user_id, context=context)
|
|
|
|
|
|
# Cria o texto de escolha para o usuário.
|
|
def render_order_selection_prompt(self, orders: list[dict]) -> str:
|
|
if len(orders) < 2:
|
|
return "Qual das acoes voce quer iniciar primeiro?"
|
|
enumerated_orders = "\n".join(
|
|
f"{index}. {self.describe_order_selection_option(order)}"
|
|
for index, order in enumerate(orders, start=1)
|
|
)
|
|
return (
|
|
f"Identifiquei {len(orders)} acoes na sua mensagem:\n"
|
|
f"{enumerated_orders}\n"
|
|
"Qual delas voce quer iniciar primeiro? Se for indiferente, eu escolho."
|
|
)
|
|
|
|
def build_order_execution_message(self, domain: str, order_message: str | None) -> str:
|
|
raw_message = str(order_message or "").strip()
|
|
normalized = self.service.normalizer.normalize_text(raw_message).strip()
|
|
if domain == "sales" and normalized in {"compra", "comprar", "venda", "pedido"}:
|
|
return "quero comprar um veiculo"
|
|
if domain == "review" and normalized in {"revisao", "agendamento", "agendar", "marcar revisao"}:
|
|
return "quero agendar revisao"
|
|
if domain == "rental" and normalized in {"aluguel", "alugar", "locacao", "locar"}:
|
|
return "quero alugar um carro"
|
|
return raw_message
|
|
|
|
def augment_actionable_orders_from_message(self, message: str, extracted_orders: list[dict]) -> list[dict]:
|
|
normalized = self.service.normalizer.normalize_text(message).strip()
|
|
if not normalized:
|
|
return extracted_orders
|
|
existing_domains = {
|
|
str(order.get("domain") or "general")
|
|
for order in extracted_orders
|
|
if isinstance(order, dict)
|
|
}
|
|
domain_hints = (
|
|
("sales", {"compra", "comprar", "venda", "pedido"}, "compra"),
|
|
("review", {"revisao", "agendamento", "agendar", "remarcar"}, "revisao"),
|
|
("rental", {"aluguel", "alugar", "locacao", "locar"}, "aluguel"),
|
|
)
|
|
augmented = list(extracted_orders)
|
|
for domain, terms, label in domain_hints:
|
|
if domain in existing_domains:
|
|
continue
|
|
if any(term in normalized for term in terms):
|
|
augmented.append(
|
|
{
|
|
"domain": domain,
|
|
"message": label,
|
|
"entities": self.service.normalizer.empty_extraction_payload(),
|
|
}
|
|
)
|
|
return augmented
|
|
|
|
def render_multi_order_clarification_prompt(self, orders: list[dict]) -> str:
|
|
if not orders:
|
|
return "Identifiquei mais de um assunto. Me diga qual voce quer iniciar primeiro."
|
|
options = "\n".join(
|
|
f"- {self.describe_order_selection_option(order)}"
|
|
for order in orders[:3]
|
|
)
|
|
return (
|
|
"Identifiquei mais de um assunto na sua mensagem:\n"
|
|
f"{options}\n"
|
|
"Para eu nao misturar os fluxos, me diga qual deles voce quer comecar primeiro."
|
|
)
|
|
|
|
|
|
# Formata o rótulo do pedido para exibição.
|
|
def describe_order_selection_option(self, order: dict) -> str:
|
|
domain = str(order.get("domain") or "general")
|
|
message = str(order.get("message") or "").strip()
|
|
domain_prefix = {
|
|
"review": "Revisao",
|
|
"sales": "Venda",
|
|
"rental": "Locacao",
|
|
"general": "Atendimento",
|
|
}.get(domain, "Atendimento")
|
|
return f"{domain_prefix}: {message}"
|
|
|
|
|
|
# É um helper simples para busca de palavras-chave
|
|
def contains_any_term(self, text: str, terms: set[str]) -> bool:
|
|
return any(term in text for term in terms)
|
|
|
|
|
|
# Limpa a mensagem para facilitar a detecção de escolha
|
|
def strip_choice_message(self, text: str) -> str:
|
|
cleaned = (text or "").strip()
|
|
while cleaned and cleaned[-1] in ".!?,;:":
|
|
cleaned = cleaned[:-1].rstrip()
|
|
return cleaned
|
|
|
|
|
|
# Se o usuário disser algo como: "esquece tudo e quero agendar revisão" a função remove a parte de reset e devolve só o novo pedido.
|
|
def remove_order_selection_reset_prefix(self, message: str) -> str:
|
|
raw = (message or "").strip()
|
|
normalized = self.service.normalizer.normalize_text(raw)
|
|
prefixes = (
|
|
"esqueca as operacoes anteriores e",
|
|
"esqueca as operacoes anteriores, agora",
|
|
"esqueca as operacoes anteriores agora",
|
|
"esqueca as operacoes anteriores",
|
|
"desconsidere as operacoes anteriores",
|
|
"desconsidera as operacoes anteriores",
|
|
"esqueca a conversa anterior e",
|
|
"esqueca tudo agora",
|
|
"esquece tudo agora",
|
|
"esqueca tudo",
|
|
"esquece tudo",
|
|
)
|
|
for prefix in prefixes:
|
|
if normalized.startswith(prefix):
|
|
return raw[len(prefix):].lstrip(" ,.:;-")
|
|
return raw
|
|
|
|
|
|
# Detecta intenção de abandonar o contexto atual
|
|
def is_order_selection_reset_message(self, message: str) -> bool:
|
|
normalized = self.service.normalizer.normalize_text(message).strip()
|
|
reset_terms = {
|
|
"esqueca as operacoes anteriores e",
|
|
"esqueca as operacoes anteriores, agora",
|
|
"esqueca as operacoes anteriores",
|
|
"desconsidere as operacoes anteriores",
|
|
"desconsidera as operacoes anteriores",
|
|
"esqueca a conversa anterior",
|
|
"esqueca tudo",
|
|
"esqueca tudo agora",
|
|
"esquece tudo",
|
|
"esquece tudo agora",
|
|
"ignora isso",
|
|
"ignore isso",
|
|
"deixa isso",
|
|
"deixa pra la",
|
|
"deixa para la",
|
|
"novo assunto",
|
|
"muda de assunto",
|
|
"vamos comecar de novo",
|
|
"comecar de novo",
|
|
"reiniciar",
|
|
"resetar conversa",
|
|
}
|
|
return self.contains_any_term(normalized, reset_terms)
|
|
|
|
|
|
# Ajuda a perceber quando o usuário talvez tenha mudado de assunto sem responder à pergunta de escolha
|
|
def looks_like_fresh_operational_request(self, message: str, turn_decision: dict | None = None) -> bool:
|
|
decision_domain = self._decision_domain(turn_decision)
|
|
decision_intent = self._decision_intent(turn_decision)
|
|
if decision_domain in {"review", "sales", "rental"} or decision_intent not in {"", "general"}:
|
|
return True
|
|
return self.looks_like_fresh_operational_request_from_text(message)
|
|
|
|
def looks_like_fresh_operational_request_from_text(self, message: str) -> bool:
|
|
normalized = self.service.normalizer.normalize_text(message).strip()
|
|
if len(normalized) < 15:
|
|
return False
|
|
operational_terms = {
|
|
"agendar",
|
|
"revisao",
|
|
"cancelar",
|
|
"pedido",
|
|
"comprar",
|
|
"compra",
|
|
"carro",
|
|
"veiculo",
|
|
"remarcar",
|
|
"tambem",
|
|
"aluguel",
|
|
"alugar",
|
|
"locacao",
|
|
"locar",
|
|
"devolver",
|
|
}
|
|
return self.contains_any_term(normalized, operational_terms)
|
|
|
|
def is_explicit_pending_order_selection_message(
|
|
self,
|
|
message: str,
|
|
turn_decision: dict | None = None,
|
|
) -> bool:
|
|
if self._decision_selection_index(turn_decision) is not None:
|
|
return True
|
|
|
|
normalized = self.strip_choice_message(self.service.normalizer.normalize_text(message))
|
|
if not normalized:
|
|
return False
|
|
|
|
indifferent_tokens = {
|
|
"tanto faz",
|
|
"indiferente",
|
|
"qualquer um",
|
|
"qualquer uma",
|
|
"voce escolhe",
|
|
"pode escolher",
|
|
"fica a seu criterio",
|
|
}
|
|
if normalized in indifferent_tokens:
|
|
return True
|
|
if re.fullmatch(r"(?:opcao|acao|pedido)?\s*(\d+)", normalized):
|
|
return True
|
|
|
|
explicit_selection_messages = {
|
|
"compra",
|
|
"comprar",
|
|
"quero comprar",
|
|
"quero comprar um veiculo",
|
|
"venda",
|
|
"pedido",
|
|
"revisao",
|
|
"agendamento",
|
|
"agendar",
|
|
"agendar revisao",
|
|
"quero agendar revisao",
|
|
"aluguel",
|
|
"alugar",
|
|
"quero alugar",
|
|
"quero alugar um carro",
|
|
"locacao",
|
|
"locar",
|
|
}
|
|
return normalized in explicit_selection_messages
|
|
|
|
def derive_operational_task_key(
|
|
self,
|
|
*,
|
|
message: str,
|
|
turn_decision: dict | None = None,
|
|
fallback_domain: str | None = None,
|
|
) -> str | None:
|
|
normalized = self.service.normalizer.normalize_text(message).strip()
|
|
domain = self._decision_domain(turn_decision) or str(fallback_domain or "").strip().lower()
|
|
intent = self._decision_intent(turn_decision)
|
|
tool_name = str((turn_decision or {}).get("tool_name") or "").strip().lower()
|
|
|
|
if domain == "sales":
|
|
if intent == "order_list" or tool_name == "listar_pedidos" or "quais pedidos" in normalized:
|
|
return "sales:list"
|
|
if intent == "order_cancel" or tool_name == "cancelar_pedido" or ("cancel" in normalized and "pedido" in normalized):
|
|
return "sales:cancel"
|
|
if tool_name == "avaliar_veiculo_troca" or ("avali" in normalized and "troca" in normalized):
|
|
return "sales:trade_in"
|
|
if (
|
|
intent in {"order_create", "inventory_search"}
|
|
or tool_name in {"consultar_estoque", "realizar_pedido"}
|
|
or self.contains_any_term(normalized, {"compra", "comprar", "venda", "carro", "veiculo"})
|
|
):
|
|
return "sales:create"
|
|
|
|
if domain == "review":
|
|
if intent == "review_list" or tool_name == "listar_agendamentos_revisao" or "agendamentos" in normalized:
|
|
return "review:list"
|
|
if intent == "review_cancel" or ("cancel" in normalized and "revis" in normalized):
|
|
return "review:cancel"
|
|
if intent == "review_reschedule" or "remarc" in normalized:
|
|
return "review:reschedule"
|
|
if (
|
|
intent == "review_schedule"
|
|
or tool_name == "agendar_revisao"
|
|
or self.contains_any_term(normalized, {"revisao", "agendar", "agendamento"})
|
|
):
|
|
return "review:schedule"
|
|
|
|
if domain == "rental":
|
|
if intent == "rental_list" or tool_name == "consultar_frota_aluguel" or "frota" in normalized:
|
|
return "rental:list"
|
|
if tool_name == "registrar_devolucao_aluguel" or "devol" in normalized:
|
|
return "rental:return"
|
|
if tool_name == "registrar_pagamento_aluguel" or "comprovante" in normalized or "pagamento" in normalized:
|
|
return "rental:payment"
|
|
if (
|
|
intent == "rental_create"
|
|
or self.contains_any_term(normalized, {"aluguel", "alugar", "locacao", "locar"})
|
|
):
|
|
return "rental:create"
|
|
|
|
return None
|
|
|
|
def derive_pending_order_task_key(self, order: dict) -> str | None:
|
|
return self.derive_operational_task_key(
|
|
message=str(order.get("seed_message") or order.get("message") or ""),
|
|
fallback_domain=str(order.get("domain") or "general"),
|
|
)
|
|
|
|
def queue_pending_orders_for_later(
|
|
self,
|
|
*,
|
|
user_id: int | None,
|
|
orders: list[dict],
|
|
skip_task_key: str | None = None,
|
|
) -> int:
|
|
queued_count = 0
|
|
skipped_matching_task = False
|
|
for order in orders:
|
|
if skip_task_key and not skipped_matching_task and self.derive_pending_order_task_key(order) == skip_task_key:
|
|
skipped_matching_task = True
|
|
continue
|
|
queued_count += int(
|
|
self.queue_order_with_memory_seed(
|
|
user_id=user_id,
|
|
domain=order["domain"],
|
|
order_message=order["message"],
|
|
memory_seed=order.get("memory_seed"),
|
|
)
|
|
)
|
|
return queued_count
|
|
|
|
|
|
# Distingue um comando global explicito de cancelamento do fluxo atual de um texto livre
|
|
# que deve ser consumido como dado do rascunho aberto.
|
|
def is_explicit_flow_cancel_message(self, message: str) -> bool:
|
|
normalized = self.service.normalizer.normalize_text(message).strip()
|
|
explicit_terms = {
|
|
"cancelar fluxo",
|
|
"cancela o fluxo",
|
|
"cancelar esse fluxo",
|
|
"cancela esse fluxo",
|
|
"cancelar fluxo atual",
|
|
"cancela o fluxo atual",
|
|
"encerrar fluxo",
|
|
"encerrar esse fluxo",
|
|
"parar fluxo",
|
|
"parar esse fluxo",
|
|
"abandonar fluxo",
|
|
"abandonar esse fluxo",
|
|
"desistir desse fluxo",
|
|
"desistir deste fluxo",
|
|
"desistir dessa compra",
|
|
"desistir desta compra",
|
|
}
|
|
return normalized in explicit_terms
|
|
|
|
|
|
# Evita que frases como "desisti" sejam tratadas como comando global quando o sistema
|
|
# esta justamente esperando o motivo do cancelamento.
|
|
def should_defer_flow_cancellation_control(self, message: str, user_id: int | None) -> bool:
|
|
if user_id is None or self.is_explicit_flow_cancel_message(message):
|
|
return False
|
|
|
|
context = self.service._get_user_context(user_id)
|
|
active_domain = str((context or {}).get("active_domain") or "general")
|
|
if (
|
|
self.looks_like_fresh_operational_request_from_text(message)
|
|
and not self.has_open_flow(user_id=user_id, domain=active_domain)
|
|
):
|
|
return True
|
|
|
|
pending_cancel_order = self.service.state.get_entry("pending_cancel_order_drafts", user_id, expire=True)
|
|
if pending_cancel_order:
|
|
payload = pending_cancel_order.get("payload", {})
|
|
if payload.get("numero_pedido") and not payload.get("motivo"):
|
|
free_text = str(message or "").strip()
|
|
if len(free_text) >= 4:
|
|
return True
|
|
|
|
pending_review_management = self.service.state.get_entry("pending_review_management_drafts", user_id, expire=True)
|
|
if pending_review_management:
|
|
payload = pending_review_management.get("payload", {})
|
|
action = pending_review_management.get("action", "cancel")
|
|
if action == "cancel" and payload.get("protocolo") and not payload.get("motivo"):
|
|
free_text = str(message or "").strip()
|
|
if len(free_text) >= 4 and not self.service._is_affirmative_message(free_text):
|
|
return True
|
|
|
|
pending_review_reuse = self.service.state.get_entry("pending_review_reuse_confirmations", user_id, expire=True)
|
|
if pending_review_reuse:
|
|
free_text = str(message or "").strip()
|
|
if free_text:
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
# Interpreta a resposta do usuário na etapa de seleção.
|
|
def detect_selected_order_index(
|
|
self,
|
|
message: str,
|
|
orders: list[dict],
|
|
turn_decision: dict | None = None,
|
|
) -> tuple[int | None, bool]:
|
|
selection_index = self._decision_selection_index(turn_decision)
|
|
if selection_index is not None and 0 <= selection_index < len(orders):
|
|
return selection_index, False
|
|
|
|
normalized = self.strip_choice_message(self.service.normalizer.normalize_text(message))
|
|
indifferent_tokens = {
|
|
"tanto faz",
|
|
"indiferente",
|
|
"qualquer um",
|
|
"qualquer uma",
|
|
"voce escolhe",
|
|
"pode escolher",
|
|
"fica a seu criterio",
|
|
}
|
|
if normalized in indifferent_tokens:
|
|
return 0, True
|
|
numeric_match = re.fullmatch(r"(?:opcao|acao|pedido)?\s*(\d+)", normalized)
|
|
if numeric_match:
|
|
candidate = int(numeric_match.group(1)) - 1
|
|
if 0 <= candidate < len(orders):
|
|
return candidate, False
|
|
if normalized in {"1", "primeiro", "primeira", "opcao 1", "acao 1", "pedido 1"}:
|
|
return 0, False
|
|
if normalized in {"2", "segundo", "segunda", "opcao 2", "acao 2", "pedido 2"}:
|
|
return 1, False
|
|
if normalized in {"3", "terceiro", "terceira", "opcao 3", "acao 3", "pedido 3"}:
|
|
return (2, False) if len(orders) >= 3 else (None, False)
|
|
decision_domain = self._decision_domain(turn_decision)
|
|
if len(orders) >= 2 and decision_domain in ACTIONABLE_ORDER_DOMAINS:
|
|
matches = [index for index, order in enumerate(orders) if order.get("domain") == decision_domain]
|
|
if len(matches) == 1:
|
|
return matches[0], False
|
|
|
|
review_matches = [index for index, order in enumerate(orders) if order.get("domain") == "review"]
|
|
sales_matches = [index for index, order in enumerate(orders) if order.get("domain") == "sales"]
|
|
rental_matches = [index for index, order in enumerate(orders) if order.get("domain") == "rental"]
|
|
has_review_signal = self.contains_any_term(normalized, {"revisao", "agendamento", "agendar", "remarcar", "pos venda"})
|
|
has_sales_signal = self.contains_any_term(normalized, {"venda", "compra", "comprar", "pedido", "cancelamento", "cancelar", "carro", "veiculo"})
|
|
has_rental_signal = self.contains_any_term(normalized, {"aluguel", "locacao", "alugar", "locar", "devolucao", "frota"})
|
|
|
|
if len(review_matches) == 1 and has_review_signal and not has_sales_signal:
|
|
return review_matches[0], False
|
|
if len(sales_matches) == 1 and has_sales_signal and not has_review_signal:
|
|
return sales_matches[0], False
|
|
if len(rental_matches) == 1 and has_rental_signal and not has_review_signal and not has_sales_signal:
|
|
return rental_matches[0], False
|
|
return None, False
|
|
|
|
|
|
# É a função que efetivamente trata a resposta do usuário quando você perguntou “qual pedido quer fazer primeiro?”.
|
|
async def try_resolve_pending_order_selection(
|
|
self,
|
|
message: str,
|
|
user_id: int | None,
|
|
turn_decision: dict | None = None,
|
|
) -> str | None:
|
|
context = self.service._get_user_context(user_id)
|
|
if not context:
|
|
return None
|
|
pending = context.get("pending_order_selection")
|
|
if not isinstance(pending, dict):
|
|
return None
|
|
if pending.get("expires_at") and pending["expires_at"] < utc_now():
|
|
context["pending_order_selection"] = None
|
|
self._save_context(user_id=user_id, context=context)
|
|
return None
|
|
orders = pending.get("orders") or []
|
|
if len(orders) < 2:
|
|
context["pending_order_selection"] = None
|
|
self._save_context(user_id=user_id, context=context)
|
|
return None
|
|
|
|
decision_action = self._decision_action(turn_decision)
|
|
if decision_action == "clear_context" or self.is_order_selection_reset_message(message):
|
|
self.service._clear_user_conversation_state(user_id=user_id)
|
|
cleaned_message = self.remove_order_selection_reset_prefix(message)
|
|
if not cleaned_message:
|
|
return "Tudo bem. Limpei o contexto atual. Pode me dizer o que voce quer fazer agora?"
|
|
return await self.service.handle_message(cleaned_message, user_id=user_id)
|
|
|
|
if (
|
|
self.looks_like_fresh_operational_request(message, turn_decision=turn_decision)
|
|
and not self.is_explicit_pending_order_selection_message(message, turn_decision=turn_decision)
|
|
):
|
|
current_task_key = self.derive_operational_task_key(
|
|
message=message,
|
|
turn_decision=turn_decision,
|
|
)
|
|
matching_indexes = [
|
|
index
|
|
for index, order in enumerate(orders)
|
|
if current_task_key and self.derive_pending_order_task_key(order) == current_task_key
|
|
]
|
|
if len(matching_indexes) == 1:
|
|
selected_index = matching_indexes[0]
|
|
selected_order = orders[selected_index]
|
|
context["pending_order_selection"] = None
|
|
self.queue_pending_orders_for_later(
|
|
user_id=user_id,
|
|
orders=[order for index, order in enumerate(orders) if index != selected_index],
|
|
)
|
|
|
|
intro = f"Perfeito. Vou comecar por: {self.describe_order_selection_option(selected_order)}"
|
|
selected_memory = dict(selected_order.get("memory_seed") or {})
|
|
context["active_domain"] = selected_order.get("domain") or context.get("active_domain", "general")
|
|
if selected_memory:
|
|
context["generic_memory"] = selected_memory
|
|
self._save_context(user_id=user_id, context=context)
|
|
next_response = await self.service.handle_message(message, user_id=user_id)
|
|
return f"{intro}\n{next_response}"
|
|
|
|
context["pending_order_selection"] = None
|
|
self._save_context(user_id=user_id, context=context)
|
|
self.queue_pending_orders_for_later(
|
|
user_id=user_id,
|
|
orders=orders,
|
|
skip_task_key=current_task_key,
|
|
)
|
|
return None
|
|
|
|
selected_index, auto_selected = self.detect_selected_order_index(
|
|
message=message,
|
|
orders=orders,
|
|
turn_decision=turn_decision,
|
|
)
|
|
if selected_index is None:
|
|
return self.render_order_selection_prompt(orders)
|
|
|
|
selected_order = orders[selected_index]
|
|
context["pending_order_selection"] = None
|
|
self.queue_pending_orders_for_later(
|
|
user_id=user_id,
|
|
orders=[order for index, order in enumerate(orders) if index != selected_index],
|
|
)
|
|
|
|
intro = (
|
|
f"Vou escolher e comecar por: {self.describe_order_selection_option(selected_order)}"
|
|
if auto_selected
|
|
else f"Perfeito. Vou comecar por: {self.describe_order_selection_option(selected_order)}"
|
|
)
|
|
selected_memory = dict(selected_order.get("memory_seed") or {})
|
|
context["active_domain"] = selected_order.get("domain") or context.get("active_domain", "general")
|
|
if selected_memory:
|
|
context["generic_memory"] = selected_memory
|
|
self._save_context(user_id=user_id, context=context)
|
|
selected_message = str(selected_order.get("seed_message") or selected_order.get("message") or "")
|
|
next_response = await self.service.handle_message(selected_message, user_id=user_id)
|
|
return f"{intro}\n{next_response}"
|
|
|
|
|
|
# Cria o aviso de fila.
|
|
def render_queue_notice(self, queued_count: int) -> str | None:
|
|
if queued_count <= 0:
|
|
return None
|
|
if queued_count == 1:
|
|
return "Anotei mais 1 pedido e sigo nele quando voce disser 'continuar'."
|
|
return f"Anotei mais {queued_count} pedidos e sigo neles conforme voce for dizendo 'continuar'."
|
|
|
|
|
|
# Mostra ao usuário o que falta concluir no fluxo atual antes de mudar de assunto.
|
|
def render_open_flow_prompt(self, user_id: int | None, domain: str) -> str:
|
|
if domain == "review" and user_id is not None:
|
|
draft = self.service.state.get_entry("pending_review_drafts", user_id, expire=True)
|
|
if draft:
|
|
missing = [field for field in REVIEW_REQUIRED_FIELDS if field not in draft.get("payload", {})]
|
|
if missing:
|
|
return self.service._render_missing_review_fields_prompt(missing)
|
|
management_draft = self.service.state.get_entry("pending_review_management_drafts", user_id, expire=True)
|
|
if management_draft:
|
|
action = management_draft.get("action", "cancel")
|
|
payload = management_draft.get("payload", {})
|
|
if action == "reschedule":
|
|
missing = [field for field in ("protocolo", "nova_data_hora") if field not in payload]
|
|
if missing:
|
|
return self.service._render_missing_review_reschedule_fields_prompt(missing)
|
|
else:
|
|
missing = [field for field in ("protocolo",) if field not in payload]
|
|
if missing:
|
|
return self.service._render_missing_review_cancel_fields_prompt(missing)
|
|
if self.service.state.get_entry("pending_review_confirmations", user_id, expire=True):
|
|
return "Antes de mudar de assunto, me confirme se podemos concluir seu agendamento de revisao."
|
|
if self.service.state.get_entry("pending_review_reuse_confirmations", user_id, expire=True):
|
|
return self.service._render_review_reuse_question()
|
|
|
|
if domain == "sales" and user_id is not None:
|
|
draft = self.service.state.get_entry("pending_order_drafts", user_id, expire=True)
|
|
if draft:
|
|
missing = [field for field in ORDER_REQUIRED_FIELDS if field not in draft.get("payload", {})]
|
|
if missing:
|
|
return self.service._render_missing_order_fields_prompt(missing)
|
|
cancel_draft = self.service.state.get_entry("pending_cancel_order_drafts", user_id, expire=True)
|
|
if cancel_draft:
|
|
missing = [field for field in CANCEL_ORDER_REQUIRED_FIELDS if field not in cancel_draft.get("payload", {})]
|
|
if missing:
|
|
return self.service._render_missing_cancel_order_fields_prompt(missing)
|
|
|
|
return "Vamos concluir este assunto primeiro e ja sigo com o proximo em seguida."
|
|
|
|
|
|
# Cria a introdução para mudar de assunto de forma natural.
|
|
def build_next_order_transition(self, domain: str) -> str:
|
|
if domain == "sales":
|
|
return "Agora, sobre a compra do veiculo:"
|
|
if domain == "review":
|
|
return "Agora, sobre o agendamento da revisao:"
|
|
return "Agora, sobre o proximo assunto:"
|
|
|
|
|
|
# Quando um fluxo termina, ela prepara a passagem para o próximo pedido da fila.
|
|
async def maybe_auto_advance_next_order(self, base_response: str, user_id: int | None) -> str:
|
|
context = self.service._get_user_context(user_id)
|
|
if not context:
|
|
return base_response
|
|
if context.get("pending_switch"):
|
|
return base_response
|
|
active_domain = context.get("active_domain", "general")
|
|
if self.has_open_flow(user_id=user_id, domain=active_domain):
|
|
return base_response
|
|
|
|
next_order = self.pop_next_order(user_id=user_id)
|
|
if not next_order:
|
|
return base_response
|
|
|
|
context["pending_switch"] = {
|
|
"source_domain": context.get("active_domain", "general"),
|
|
"target_domain": next_order["domain"],
|
|
"queued_message": next_order["message"],
|
|
"memory_seed": dict(next_order.get("memory_seed") or self.service._new_tab_memory(user_id=user_id)),
|
|
"expires_at": utc_now() + timedelta(minutes=15),
|
|
}
|
|
self._save_context(user_id=user_id, context=context)
|
|
transition = self.build_next_order_transition(next_order["domain"])
|
|
return (
|
|
f"{base_response}\n\n"
|
|
f"{transition}\n"
|
|
"Tenho um proximo pedido na fila. Quando quiser, diga 'continuar' para eu seguir nele."
|
|
)
|
|
|
|
|
|
# Converte intenções em um domínio principal de atendimento.
|
|
def domain_from_intents(self, intents: dict | None) -> str:
|
|
normalized = self.service.normalizer.normalize_intents(intents)
|
|
review_score = (
|
|
int(normalized.get("review_schedule", False))
|
|
+ int(normalized.get("review_list", False))
|
|
+ int(normalized.get("review_cancel", False))
|
|
+ int(normalized.get("review_reschedule", False))
|
|
)
|
|
sales_score = (
|
|
int(normalized.get("order_create", False))
|
|
+ int(normalized.get("order_list", False))
|
|
+ int(normalized.get("order_cancel", False))
|
|
)
|
|
if review_score > sales_score and review_score > 0:
|
|
return "review"
|
|
if sales_score > review_score and sales_score > 0:
|
|
return "sales"
|
|
return "general"
|
|
|
|
|
|
# Detecta comandos de continuação.
|
|
def is_context_switch_confirmation(self, message: str, turn_decision: dict | None = None) -> bool:
|
|
if self._decision_action(turn_decision) in {"continue_queue", "cancel_active_flow", "clear_context", "discard_queue"}:
|
|
return True
|
|
if self._decision_domain(turn_decision) in {"review", "sales", "rental"}:
|
|
return True
|
|
return self.service._is_affirmative_message(message) or self.service._is_negative_message(message)
|
|
|
|
def is_simple_confirmation_message(self, message: str) -> bool:
|
|
normalized = self.strip_choice_message(self.service.normalizer.normalize_text(message))
|
|
return normalized in {
|
|
"sim",
|
|
"nao",
|
|
"não",
|
|
"ok",
|
|
"pode",
|
|
"confirmo",
|
|
"aceito",
|
|
"fechado",
|
|
"pode sim",
|
|
"tenho",
|
|
"tenho sim",
|
|
"negativo",
|
|
}
|
|
|
|
|
|
# Executa o próximo pedido da fila quando o usuário disser “continuar”.
|
|
def is_continue_queue_message(self, message: str, turn_decision: dict | None = None) -> bool:
|
|
if self._decision_action(turn_decision) == "continue_queue" or self._decision_intent(turn_decision) == "queue_continue":
|
|
return True
|
|
normalized = self.service.normalizer.normalize_text(message).strip()
|
|
normalized = re.sub(r"[.!?,;:]+$", "", normalized)
|
|
return normalized in {"continuar", "pode continuar", "seguir", "pode seguir", "proximo", "segue"}
|
|
|
|
|
|
# Executa o próximo pedido da fila quando o usuário disser “continuar”.
|
|
async def try_continue_queued_order(
|
|
self,
|
|
message: str,
|
|
user_id: int | None,
|
|
turn_decision: dict | None = None,
|
|
) -> str | None:
|
|
context = self.service._get_user_context(user_id)
|
|
if not context:
|
|
return None
|
|
pending_switch = context.get("pending_switch")
|
|
if not isinstance(pending_switch, dict):
|
|
return None
|
|
if pending_switch.get("expires_at") and pending_switch["expires_at"] < utc_now():
|
|
context["pending_switch"] = None
|
|
self._save_context(user_id=user_id, context=context)
|
|
return None
|
|
queued_message = str(pending_switch.get("queued_message") or "").strip()
|
|
if not queued_message:
|
|
return None
|
|
|
|
decision_action = self._decision_action(turn_decision)
|
|
if self.service._is_negative_message(message) and decision_action != "continue_queue":
|
|
context["pending_switch"] = None
|
|
self._save_context(user_id=user_id, context=context)
|
|
return "Tudo bem. Mantive o proximo pedido fora da fila por enquanto."
|
|
if not (
|
|
self.is_continue_queue_message(message, turn_decision=turn_decision)
|
|
or self.service._is_affirmative_message(message)
|
|
):
|
|
return None
|
|
|
|
target_domain = str(pending_switch.get("target_domain") or "general")
|
|
memory_seed = dict(pending_switch.get("memory_seed") or {})
|
|
self.apply_domain_switch(user_id=user_id, target_domain=target_domain)
|
|
refreshed = self.service._get_user_context(user_id)
|
|
if refreshed is not None:
|
|
refreshed["generic_memory"] = memory_seed
|
|
self._save_context(user_id=user_id, context=refreshed)
|
|
transition = self.build_next_order_transition(target_domain)
|
|
next_response = await self.service.handle_message(queued_message, user_id=user_id)
|
|
return f"{transition}\n{next_response}"
|
|
|
|
|
|
# Diz se ainda existe algo pendente antes de encerrar aquele assunto.
|
|
def has_open_flow(self, user_id: int | None, domain: str) -> bool:
|
|
if user_id is None:
|
|
return False
|
|
if domain == "review":
|
|
return bool(
|
|
self.service.state.get_entry("pending_review_drafts", user_id, expire=True)
|
|
or self.service.state.get_entry("pending_review_confirmations", user_id, expire=True)
|
|
or self.service.state.get_entry("pending_review_management_drafts", user_id, expire=True)
|
|
or self.service.state.get_entry("pending_review_reuse_confirmations", user_id, expire=True)
|
|
)
|
|
if domain == "sales":
|
|
return bool(
|
|
self.service.state.get_entry("pending_order_drafts", user_id, expire=True)
|
|
or self.service.state.get_entry("pending_cancel_order_drafts", user_id, expire=True)
|
|
)
|
|
if domain == "rental":
|
|
return bool(
|
|
self.service.state.get_entry("pending_rental_drafts", user_id, expire=True)
|
|
or self.service.state.get_entry("pending_rental_selections", user_id, expire=True)
|
|
)
|
|
return False
|
|
|
|
|
|
# Encerra o contexto anterior e troca oficialmente para o novo assunto.
|
|
def apply_domain_switch(self, user_id: int | None, target_domain: str) -> None:
|
|
context = self.service._get_user_context(user_id)
|
|
if not context:
|
|
return
|
|
previous_domain = context.get("active_domain", "general")
|
|
if previous_domain == "review":
|
|
self.service._reset_pending_review_states(user_id=user_id)
|
|
if previous_domain == "sales":
|
|
self.service._reset_pending_order_states(user_id=user_id)
|
|
if previous_domain == "rental":
|
|
self.service._reset_pending_rental_states(user_id=user_id)
|
|
context["active_domain"] = target_domain
|
|
context["generic_memory"] = self.service._new_tab_memory(user_id=user_id)
|
|
context["pending_order_selection"] = None
|
|
context["pending_switch"] = None
|
|
self._save_context(user_id=user_id, context=context)
|
|
|
|
|
|
# Controla a confirmação de “você quer mesmo sair deste assunto e ir para outro?”.
|
|
def handle_context_switch(
|
|
self,
|
|
message: str,
|
|
user_id: int | None,
|
|
target_domain_hint: str = "general",
|
|
turn_decision: dict | None = None,
|
|
) -> str | None:
|
|
context = self.service._get_user_context(user_id)
|
|
if not context:
|
|
return None
|
|
pending_switch = context.get("pending_switch")
|
|
if pending_switch:
|
|
if pending_switch["expires_at"] < utc_now():
|
|
context["pending_switch"] = None
|
|
self._save_context(user_id=user_id, context=context)
|
|
elif (
|
|
self._decision_domain(turn_decision) in {"review", "sales", "rental"}
|
|
and self._decision_domain(turn_decision) != pending_switch["target_domain"]
|
|
):
|
|
context["pending_switch"] = None
|
|
self._save_context(user_id=user_id, context=context)
|
|
elif self.is_context_switch_confirmation(message, turn_decision=turn_decision):
|
|
if self.service._is_affirmative_message(message) or self._decision_domain(turn_decision) == pending_switch["target_domain"]:
|
|
target_domain = pending_switch["target_domain"]
|
|
self.apply_domain_switch(user_id=user_id, target_domain=target_domain)
|
|
return self.render_context_switched_message(target_domain=target_domain)
|
|
context["pending_switch"] = None
|
|
self._save_context(user_id=user_id, context=context)
|
|
return "Perfeito, vamos continuar no fluxo atual."
|
|
|
|
pending_order_selection = context.get("pending_order_selection")
|
|
if pending_order_selection and pending_order_selection.get("expires_at") < utc_now():
|
|
context["pending_order_selection"] = None
|
|
self._save_context(user_id=user_id, context=context)
|
|
|
|
current_domain = context.get("active_domain", "general")
|
|
if target_domain_hint == "general" or target_domain_hint == current_domain:
|
|
return None
|
|
if not self.has_open_flow(user_id=user_id, domain=current_domain):
|
|
return None
|
|
if (
|
|
self.is_simple_confirmation_message(message)
|
|
and not self.looks_like_fresh_operational_request_from_text(message)
|
|
):
|
|
return None
|
|
|
|
context["pending_switch"] = {
|
|
"source_domain": current_domain,
|
|
"target_domain": target_domain_hint,
|
|
"expires_at": utc_now() + timedelta(minutes=15),
|
|
}
|
|
self._save_context(user_id=user_id, context=context)
|
|
return self.render_context_switch_confirmation(source_domain=current_domain, target_domain=target_domain_hint)
|
|
|
|
|
|
# Marca qual domínio está ativo atualmente.
|
|
def update_active_domain(self, user_id: int | None, domain_hint: str = "general") -> None:
|
|
context = self.service._get_user_context(user_id)
|
|
if context and domain_hint != "general":
|
|
context["active_domain"] = domain_hint
|
|
self._save_context(user_id=user_id, context=context)
|
|
|
|
|
|
# Serve para exibir o nome do domínio em mensagens para o usuário.
|
|
def domain_label(self, domain: str) -> str:
|
|
labels = {
|
|
"review": "agendamento de revisao",
|
|
"sales": "compra de veiculo",
|
|
"rental": "locacao de veiculo",
|
|
"general": "atendimento geral",
|
|
}
|
|
return labels.get(domain, "atendimento")
|
|
|
|
|
|
# É o prompt de confirmação da troca.
|
|
def render_context_switch_confirmation(self, source_domain: str, target_domain: str) -> str:
|
|
return (
|
|
f"Entendi que voce quer sair de {self.domain_label(source_domain)} "
|
|
f"e ir para {self.domain_label(target_domain)}. Tem certeza?"
|
|
)
|
|
|
|
|
|
#Mensagem exibida após a troca acontecer.
|
|
def render_domain_onboarding_prompt(self, target_domain: str) -> str:
|
|
if target_domain == "sales":
|
|
return "Pode me dizer a faixa de preco, o modelo ou o tipo de carro que voce procura."
|
|
if target_domain == "review":
|
|
return "Pode me informar a placa ou, se preferir, ja mandar placa, data/hora, modelo, ano, km e se ja fez revisao."
|
|
if target_domain == "rental":
|
|
return "Pode me dizer qual carro voce quer alugar e o periodo desejado para a locacao."
|
|
return "Pode me dizer o que voce quer fazer agora?"
|
|
|
|
def render_context_switched_message(self, target_domain: str) -> str:
|
|
return (
|
|
f"Certo, contexto anterior encerrado. Vamos seguir com {self.domain_label(target_domain)}.\n"
|
|
f"{self.render_domain_onboarding_prompt(target_domain)}"
|
|
)
|
|
|
|
def _field_label(self, field_name: str) -> str:
|
|
return CONTEXT_FIELD_LABELS.get(field_name, field_name)
|
|
|
|
def _task_label(self, active_task: str | None) -> str:
|
|
task_key = str(active_task or "").strip().lower()
|
|
return ACTIVE_TASK_LABELS.get(task_key, str(active_task or "").strip())
|
|
|
|
def _format_summary_value(self, field_name: str, value) -> str:
|
|
if isinstance(value, bool):
|
|
return "sim" if value else "nao"
|
|
if isinstance(value, (int, float)) and field_name in {"orcamento_max", "valor_veiculo", "preco"}:
|
|
amount = float(value)
|
|
formatted = f"{amount:,.0f}" if amount.is_integer() else f"{amount:,.2f}"
|
|
formatted = formatted.replace(",", "X").replace(".", ",").replace("X", ".")
|
|
return f"R$ {formatted}"
|
|
if isinstance(value, list):
|
|
rendered_items = [
|
|
self._format_summary_value(field_name=field_name, value=item)
|
|
for item in value
|
|
if item not in (None, "", [], {})
|
|
]
|
|
if not rendered_items:
|
|
return ""
|
|
if len(rendered_items) <= 3:
|
|
return ", ".join(rendered_items)
|
|
return f"{', '.join(rendered_items[:3])}, ..."
|
|
if isinstance(value, str):
|
|
compact = value.strip()
|
|
if len(compact) > 80:
|
|
return f"{compact[:77]}..."
|
|
return compact
|
|
return str(value)
|
|
|
|
def _summarize_payload(self, payload: dict | None, field_order: tuple[str, ...]) -> str:
|
|
if not isinstance(payload, dict):
|
|
return ""
|
|
fragments: list[str] = []
|
|
for field_name in field_order:
|
|
value = payload.get(field_name)
|
|
if value in (None, "", [], {}):
|
|
continue
|
|
rendered = self._format_summary_value(field_name=field_name, value=value)
|
|
if not rendered:
|
|
continue
|
|
fragments.append(f"{self._field_label(field_name)}={rendered}")
|
|
return ", ".join(fragments)
|
|
|
|
def _summarize_missing_fields(self, payload: dict | None, required_fields: tuple[str, ...]) -> str:
|
|
if not isinstance(payload, dict):
|
|
return ", ".join(self._field_label(field_name) for field_name in required_fields)
|
|
missing_fields = [self._field_label(field_name) for field_name in required_fields if field_name not in payload]
|
|
return ", ".join(missing_fields)
|
|
|
|
def _get_pending_entry(self, user_id: int | None, bucket: str, snapshot_key: str | None = None) -> dict | None:
|
|
entry = self.service.state.get_entry(bucket, user_id, expire=True)
|
|
if entry:
|
|
return entry
|
|
if user_id is None:
|
|
return None
|
|
context = self.service._get_user_context(user_id)
|
|
if not isinstance(context, dict):
|
|
return None
|
|
snapshots = context.get("flow_snapshots")
|
|
if not isinstance(snapshots, dict) or not snapshot_key:
|
|
return None
|
|
snapshot = snapshots.get(snapshot_key)
|
|
if not isinstance(snapshot, dict):
|
|
return None
|
|
expires_at = snapshot.get("expires_at")
|
|
if isinstance(expires_at, datetime) and expires_at < utc_now():
|
|
return None
|
|
return snapshot
|
|
|
|
|
|
# Serve para depuração, observabilidade ou até para alimentar outro componente com um resumo do estado atual.
|
|
def build_context_summary(self, user_id: int | None) -> str:
|
|
context = self.service._get_user_context(user_id)
|
|
if not context:
|
|
return "Contexto de conversa: sem contexto ativo."
|
|
summary = [f"Contexto de conversa ativo: {self.domain_label(context.get('active_domain', 'general'))}."]
|
|
active_task = str(context.get("active_task") or "").strip()
|
|
if active_task:
|
|
summary.append(f"Fluxo ativo: {self._task_label(active_task)}.")
|
|
memory = context.get("generic_memory", {})
|
|
if memory:
|
|
summarized_memory = self._summarize_payload(memory, GENERIC_MEMORY_SUMMARY_FIELDS)
|
|
if summarized_memory:
|
|
summary.append(f"Memoria generica temporaria: {summarized_memory}.")
|
|
selected_vehicle = context.get("selected_vehicle")
|
|
if isinstance(selected_vehicle, dict) and selected_vehicle.get("modelo"):
|
|
summary.append(f"Veiculo selecionado para compra: {selected_vehicle.get('modelo')}.")
|
|
selected_rental_vehicle = context.get("selected_rental_vehicle")
|
|
if isinstance(selected_rental_vehicle, dict) and selected_rental_vehicle.get("modelo"):
|
|
summary.append(f"Veiculo selecionado para locacao: {selected_rental_vehicle.get('modelo')}.")
|
|
stock_results = context.get("last_stock_results") or []
|
|
if isinstance(stock_results, list) and stock_results:
|
|
summary.append(f"Ultima consulta de estoque com {len(stock_results)} opcao(oes) disponivel(is).")
|
|
rental_results = context.get("last_rental_results") or []
|
|
if isinstance(rental_results, list) and rental_results:
|
|
summary.append(f"Ultima consulta de locacao com {len(rental_results)} opcao(oes) disponivel(is).")
|
|
last_tool_result = context.get("last_tool_result")
|
|
if isinstance(last_tool_result, dict) and last_tool_result.get("tool_name"):
|
|
tool_name = str(last_tool_result.get("tool_name") or "").strip()
|
|
result_type = str(last_tool_result.get("result_type") or "").strip()
|
|
if tool_name and result_type:
|
|
summary.append(f"Ultima tool executada: {tool_name} ({result_type}).")
|
|
elif tool_name:
|
|
summary.append(f"Ultima tool executada: {tool_name}.")
|
|
pending_order_selection = context.get("pending_order_selection")
|
|
if isinstance(pending_order_selection, dict):
|
|
orders = pending_order_selection.get("orders")
|
|
if isinstance(orders, list) and orders:
|
|
summary.append(f"Aguardando escolha entre {len(orders)} pedido(s) detectado(s) na mesma mensagem.")
|
|
pending_switch = context.get("pending_switch")
|
|
if isinstance(pending_switch, dict):
|
|
target_domain = str(pending_switch.get("target_domain") or "general")
|
|
summary.append(f"Troca de contexto pendente para {self.domain_label(target_domain)}.")
|
|
order_queue = context.get("order_queue", [])
|
|
if order_queue:
|
|
summary.append(f"Fila de pedidos pendentes: {len(order_queue)}.")
|
|
pending_single_vehicle = context.get("pending_single_vehicle_confirmation")
|
|
if isinstance(pending_single_vehicle, dict) and pending_single_vehicle.get("modelo"):
|
|
summary.append(
|
|
f"Aguardando confirmacao explicita do veiculo {pending_single_vehicle.get('modelo')}."
|
|
)
|
|
|
|
review_draft = self._get_pending_entry(user_id, "pending_review_drafts", "review_schedule")
|
|
if isinstance(review_draft, dict):
|
|
payload = review_draft.get("payload")
|
|
known_fields = self._summarize_payload(payload, REVIEW_SUMMARY_FIELDS)
|
|
missing_fields = self._summarize_missing_fields(payload, REVIEW_REQUIRED_FIELDS)
|
|
draft_summary = "Rascunho aberto de agendamento de revisao."
|
|
if known_fields:
|
|
draft_summary = f"{draft_summary} Dados atuais: {known_fields}."
|
|
if missing_fields:
|
|
draft_summary = f"{draft_summary} Faltando: {missing_fields}."
|
|
summary.append(draft_summary)
|
|
|
|
review_management_draft = self._get_pending_entry(
|
|
user_id,
|
|
"pending_review_management_drafts",
|
|
"review_management",
|
|
)
|
|
if isinstance(review_management_draft, dict):
|
|
action = str(review_management_draft.get("action") or "cancel").strip().lower()
|
|
payload = review_management_draft.get("payload")
|
|
known_fields = self._summarize_payload(payload, REVIEW_MANAGEMENT_SUMMARY_FIELDS)
|
|
if action == "reschedule":
|
|
missing_fields = self._summarize_missing_fields(payload, ("protocolo", "nova_data_hora"))
|
|
draft_summary = "Rascunho aberto de remarcacao de revisao."
|
|
else:
|
|
missing_fields = self._summarize_missing_fields(payload, ("protocolo",))
|
|
draft_summary = "Rascunho aberto de cancelamento de revisao."
|
|
if known_fields:
|
|
draft_summary = f"{draft_summary} Dados atuais: {known_fields}."
|
|
if missing_fields:
|
|
draft_summary = f"{draft_summary} Faltando: {missing_fields}."
|
|
summary.append(draft_summary)
|
|
|
|
review_confirmation = self._get_pending_entry(
|
|
user_id,
|
|
"pending_review_confirmations",
|
|
"review_confirmation",
|
|
)
|
|
if isinstance(review_confirmation, dict):
|
|
payload = review_confirmation.get("payload")
|
|
known_fields = self._summarize_payload(payload, REVIEW_SUMMARY_FIELDS)
|
|
confirmation_summary = "Confirmacao pendente de horario sugerido para revisao."
|
|
if known_fields:
|
|
confirmation_summary = f"{confirmation_summary} Dados sugeridos: {known_fields}."
|
|
summary.append(confirmation_summary)
|
|
|
|
review_reuse = self._get_pending_entry(
|
|
user_id,
|
|
"pending_review_reuse_confirmations",
|
|
"review_reuse_confirmation",
|
|
)
|
|
if isinstance(review_reuse, dict):
|
|
payload = review_reuse.get("payload")
|
|
known_fields = self._summarize_payload(payload, REVIEW_SUMMARY_FIELDS)
|
|
reuse_summary = "Confirmacao pendente para reutilizar dados da ultima revisao."
|
|
if known_fields:
|
|
reuse_summary = f"{reuse_summary} Dados reaproveitaveis: {known_fields}."
|
|
summary.append(reuse_summary)
|
|
|
|
order_draft = self._get_pending_entry(user_id, "pending_order_drafts", "order_create")
|
|
if isinstance(order_draft, dict):
|
|
payload = order_draft.get("payload")
|
|
known_fields = self._summarize_payload(payload, ORDER_SUMMARY_FIELDS)
|
|
missing_fields = self._summarize_missing_fields(payload, ORDER_REQUIRED_FIELDS)
|
|
draft_summary = "Rascunho aberto de criacao de pedido."
|
|
if known_fields:
|
|
draft_summary = f"{draft_summary} Dados atuais: {known_fields}."
|
|
if missing_fields:
|
|
draft_summary = f"{draft_summary} Faltando: {missing_fields}."
|
|
summary.append(draft_summary)
|
|
|
|
cancel_order_draft = self._get_pending_entry(user_id, "pending_cancel_order_drafts", "order_cancel")
|
|
if isinstance(cancel_order_draft, dict):
|
|
payload = cancel_order_draft.get("payload")
|
|
known_fields = self._summarize_payload(payload, CANCEL_ORDER_SUMMARY_FIELDS)
|
|
missing_fields = self._summarize_missing_fields(payload, CANCEL_ORDER_REQUIRED_FIELDS)
|
|
draft_summary = "Rascunho aberto de cancelamento de pedido."
|
|
if known_fields:
|
|
draft_summary = f"{draft_summary} Dados atuais: {known_fields}."
|
|
if missing_fields:
|
|
draft_summary = f"{draft_summary} Faltando: {missing_fields}."
|
|
summary.append(draft_summary)
|
|
|
|
stock_selection = self._get_pending_entry(user_id, "pending_stock_selections")
|
|
if isinstance(stock_selection, dict):
|
|
payload = stock_selection.get("payload")
|
|
if isinstance(payload, list) and payload:
|
|
summary.append(f"Aguardando escolha de veiculo em {len(payload)} opcao(oes) de estoque.")
|
|
|
|
rental_draft = self._get_pending_entry(user_id, "pending_rental_drafts")
|
|
if isinstance(rental_draft, dict):
|
|
payload = rental_draft.get("payload")
|
|
known_fields = self._summarize_payload(payload, ("placa", "rental_vehicle_id", "data_inicio", "data_fim_prevista", "cpf"))
|
|
missing_fields = self._summarize_missing_fields(payload, ("rental_vehicle_id", "data_inicio", "data_fim_prevista"))
|
|
draft_summary = "Rascunho aberto de locacao."
|
|
if known_fields:
|
|
draft_summary = f"{draft_summary} Dados atuais: {known_fields}."
|
|
if missing_fields:
|
|
draft_summary = f"{draft_summary} Faltando: {missing_fields}."
|
|
summary.append(draft_summary)
|
|
|
|
rental_selection = self._get_pending_entry(user_id, "pending_rental_selections")
|
|
if isinstance(rental_selection, dict):
|
|
payload = rental_selection.get("payload")
|
|
if isinstance(payload, list) and payload:
|
|
summary.append(f"Aguardando escolha de veiculo em {len(payload)} opcao(oes) de locacao.")
|
|
return " ".join(summary)
|