♻️ refactor(orchestration): extrair componentes do orquestrador

Move a normalização, o planejamento de mensagens, a política conversacional e a execução de tools para módulos dedicados.

Reduz o acoplamento do OrquestradorService e prepara a base para evoluções futuras com menos risco de regressão.
main
parent c52707deb1
commit 14778fac0b

@ -0,0 +1,655 @@
import re
from datetime import datetime, timedelta
from typing import TYPE_CHECKING
from app.services.orchestration.orchestrator_config import (
CANCEL_ORDER_REQUIRED_FIELDS,
ORDER_REQUIRED_FIELDS,
PENDING_ORDER_SELECTION_TTL_MINUTES,
REVIEW_REQUIRED_FIELDS,
)
if TYPE_CHECKING:
from app.services.orchestration.orquestrador_service import OrquestradorService
# essa classe é responsável por controlar qual o assunto está ativo na conversa, se existe fluxo aberto, se o usuário mandou dois pedidos ao mesmo tempo...
class ConversationPolicy:
def __init__(self, service: "OrquestradorService"):
self.service = service
# Essa função serve para reaproveitar informações já informadas antes, evitando pedir novamente ao usuário.
def try_prefill_review_fields_from_memory(self, user_id: int | None, payload: dict) -> None:
if user_id is None:
return
context = self.service._get_user_context(user_id)
if not context:
return
memory = context.get("generic_memory", {})
if payload.get("placa") is None:
plate = self.service.normalizer.normalize_plate(memory.get("placa"))
if plate:
payload["placa"] = plate
# Essa função coloca um pedido na fila
def queue_order(self, user_id: int | None, domain: str, order_message: str) -> None:
self.queue_order_with_memory_seed(
user_id=user_id,
domain=domain,
order_message=order_message,
memory_seed=self.service._new_tab_memory(user_id=user_id),
)
# Enfileira um próximo assunto para ser tratado depois, preservando dados úteis daquele pedido
def queue_order_with_memory_seed(
self,
user_id: int | None,
domain: str,
order_message: str,
memory_seed: dict | None = None,
) -> None:
context = self.service._get_user_context(user_id)
if not context or domain == "general":
return
queue = context.setdefault("order_queue", [])
queue.append(
{
"domain": domain,
"message": (order_message or "").strip(),
"memory_seed": dict(memory_seed or self.service._new_tab_memory(user_id=user_id)),
"created_at": datetime.utcnow().isoformat(),
}
)
# Transforma as entidades extraídas de um pedido em uma memória temporária pronta para usar quando esse pedido for processado.
def build_order_memory_seed(self, user_id: int | None, order: dict | None = None) -> dict:
seed = dict(self.service._new_tab_memory(user_id=user_id))
if not isinstance(order, dict):
return seed
entities = order.get("entities")
if not isinstance(entities, dict):
return seed
generic_memory = self.service.normalizer.normalize_generic_fields(entities.get("generic_memory"))
if generic_memory:
seed.update(generic_memory)
review_fields = self.service.normalizer.normalize_review_fields(entities.get("review_fields"))
if review_fields.get("placa") and not seed.get("placa"):
seed["placa"] = review_fields["placa"]
order_fields = self.service.normalizer.normalize_order_fields(entities.get("order_fields"))
if order_fields.get("cpf") and not seed.get("cpf"):
seed["cpf"] = order_fields["cpf"]
if order_fields.get("valor_veiculo") and not seed.get("orcamento_max"):
seed["orcamento_max"] = int(round(order_fields["valor_veiculo"]))
return seed
# Pega o próximo assunto pendente do usuário.
def pop_next_order(self, user_id: int | None) -> dict | None:
context = self.service._get_user_context(user_id)
if not context:
return None
queue = context.setdefault("order_queue", [])
if not queue:
return None
return queue.pop(0)
# Decide qual pedido deve ser processado agora, qual vai para fila, e se o usuário precisa escolher entre dois pedidos.
def prepare_message_for_single_order(
self,
message: str,
user_id: int | None,
routing_plan: dict | None = None,
) -> tuple[str, str | None, str | None]:
context = self.service._get_user_context(user_id)
if not context:
return message, None, None
queue_notice = None
active_domain = context.get("active_domain", "general")
orders_raw = (routing_plan or {}).get("orders") if isinstance(routing_plan, dict) else None
extracted_orders: list[dict] = []
if isinstance(orders_raw, list):
for item in orders_raw:
if not isinstance(item, dict):
continue
domain = str(item.get("domain") or "general").strip().lower()
if domain not in {"review", "sales", "general"}:
domain = "general"
segment = str(item.get("message") or "").strip()
if segment:
extracted_orders.append(
{
"domain": domain,
"message": segment,
"entities": self.service._coerce_extraction_contract(item.get("entities")),
}
)
if not extracted_orders:
extracted_orders = [{"domain": "general", "message": (message or "").strip()}]
if (
len(extracted_orders) == 2
and all(order["domain"] != "general" for order in extracted_orders)
and not self.has_open_flow(user_id=user_id, domain=active_domain)
):
self.store_pending_order_selection(user_id=user_id, orders=extracted_orders)
return message, None, self.render_order_selection_prompt(extracted_orders)
if len(extracted_orders) <= 1:
inferred = extracted_orders[0]["domain"]
if (
inferred != "general"
and inferred != active_domain
and self.has_open_flow(user_id=user_id, domain=active_domain)
):
self.queue_order(user_id=user_id, domain=inferred, order_message=message)
queue_hint = self.render_queue_notice(1)
prompt = self.render_open_flow_prompt(user_id=user_id, domain=active_domain)
return message, None, f"{prompt}\n{queue_hint}" if queue_hint else prompt
return message, None, None
if self.has_open_flow(user_id=user_id, domain=active_domain):
queued_count = 0
for queued in extracted_orders:
if queued["domain"] != active_domain:
self.queue_order_with_memory_seed(
user_id=user_id,
domain=queued["domain"],
order_message=queued["message"],
memory_seed=self.build_order_memory_seed(user_id=user_id, order=queued),
)
queued_count += 1
queue_hint = self.render_queue_notice(queued_count)
prompt = self.render_open_flow_prompt(user_id=user_id, domain=active_domain)
return message, None, f"{prompt}\n{queue_hint}" if queue_hint else prompt
first = extracted_orders[0]
queued_count = 0
for queued in extracted_orders[1:]:
self.queue_order_with_memory_seed(
user_id=user_id,
domain=queued["domain"],
order_message=queued["message"],
memory_seed=self.build_order_memory_seed(user_id=user_id, order=queued),
)
queued_count += 1
context["active_domain"] = first["domain"]
context["generic_memory"] = self.build_order_memory_seed(user_id=user_id, order=first)
queue_notice = self.render_queue_notice(queued_count)
return first["message"], queue_notice, None
# Serve para concatenar mensagens auxiliares com a resposta principal.
def compose_order_aware_response(self, response: str, queue_notice: str | None = None) -> str:
lines = []
if queue_notice:
lines.append(queue_notice)
lines.append(response)
return "\n".join(lines)
# Armazena uma “decisão pendente” de qual pedido começar.
def store_pending_order_selection(self, user_id: int | None, orders: list[dict]) -> None:
context = self.service._get_user_context(user_id)
if not context:
return
context["pending_order_selection"] = {
"orders": [
{
"domain": order["domain"],
"message": order["message"],
"memory_seed": self.build_order_memory_seed(user_id=user_id, order=order),
}
for order in orders[:2]
],
"expires_at": datetime.utcnow() + timedelta(minutes=PENDING_ORDER_SELECTION_TTL_MINUTES),
}
# Cria o texto de escolha para o usuário.
def render_order_selection_prompt(self, orders: list[dict]) -> str:
if len(orders) < 2:
return "Qual das acoes voce quer iniciar primeiro?"
first_label = self.describe_order_selection_option(orders[0])
second_label = self.describe_order_selection_option(orders[1])
return (
"Identifiquei duas acoes na sua mensagem:\n"
f"1. {first_label}\n"
f"2. {second_label}\n"
"Qual delas voce quer iniciar primeiro? Se for indiferente, eu escolho."
)
# Formata o rótulo do pedido para exibição.
def describe_order_selection_option(self, order: dict) -> str:
domain = str(order.get("domain") or "general")
message = str(order.get("message") or "").strip()
domain_prefix = {
"review": "Revisao",
"sales": "Venda",
"general": "Atendimento",
}.get(domain, "Atendimento")
return f"{domain_prefix}: {message}"
# É um helper simples para busca de palavras-chave
def contains_any_term(self, text: str, terms: set[str]) -> bool:
return any(term in text for term in terms)
# Limpa a mensagem para facilitar a detecção de escolha
def strip_choice_message(self, text: str) -> str:
cleaned = (text or "").strip()
while cleaned and cleaned[-1] in ".!?,;:":
cleaned = cleaned[:-1].rstrip()
return cleaned
# Se o usuário disser algo como: "esquece tudo e quero agendar revisão" a função remove a parte de reset e devolve só o novo pedido.
def remove_order_selection_reset_prefix(self, message: str) -> str:
raw = (message or "").strip()
normalized = self.service.normalizer.normalize_text(raw)
prefixes = ("esqueca tudo agora", "esqueca tudo", "esquece tudo agora", "esquece tudo")
for prefix in prefixes:
if normalized.startswith(prefix):
return raw[len(prefix):].lstrip(" ,.:;-")
return raw
# Detecta intenção de abandonar o contexto atual
def is_order_selection_reset_message(self, message: str) -> bool:
normalized = self.service.normalizer.normalize_text(message).strip()
reset_terms = {
"esqueca tudo",
"esqueca tudo agora",
"esquece tudo",
"esquece tudo agora",
"ignora isso",
"ignore isso",
"deixa isso",
"deixa pra la",
"deixa para la",
"novo assunto",
"muda de assunto",
"vamos comecar de novo",
"comecar de novo",
"reiniciar",
"resetar conversa",
}
return self.contains_any_term(normalized, reset_terms)
# Ajuda a perceber quando o usuário talvez tenha mudado de assunto sem responder à pergunta de escolha
def looks_like_fresh_operational_request(self, message: str) -> bool:
normalized = self.service.normalizer.normalize_text(message).strip()
if len(normalized) < 15:
return False
operational_terms = {
"agendar",
"revisao",
"cancelar",
"pedido",
"comprar",
"compra",
"carro",
"veiculo",
"remarcar",
"tambem",
}
return self.contains_any_term(normalized, operational_terms)
# Interpreta a resposta do usuário na etapa de seleção.
def detect_selected_order_index(self, message: str, orders: list[dict]) -> tuple[int | None, bool]:
normalized = self.strip_choice_message(self.service.normalizer.normalize_text(message))
indifferent_tokens = {
"tanto faz",
"indiferente",
"qualquer um",
"qualquer uma",
"voce escolhe",
"pode escolher",
"fica a seu criterio",
}
if normalized in indifferent_tokens:
return 0, True
if normalized in {"1", "primeiro", "primeira", "opcao 1", "acao 1", "pedido 1"}:
return 0, False
if normalized in {"2", "segundo", "segunda", "opcao 2", "acao 2", "pedido 2"}:
return 1, False
review_matches = [index for index, order in enumerate(orders) if order.get("domain") == "review"]
sales_matches = [index for index, order in enumerate(orders) if order.get("domain") == "sales"]
has_review_signal = self.contains_any_term(normalized, {"revisao", "agendamento", "agendar", "remarcar", "pos venda"})
has_sales_signal = self.contains_any_term(normalized, {"venda", "compra", "comprar", "pedido", "cancelamento", "cancelar", "carro", "veiculo"})
if len(review_matches) == 1 and has_review_signal and not has_sales_signal:
return review_matches[0], False
if len(sales_matches) == 1 and has_sales_signal and not has_review_signal:
return sales_matches[0], False
return None, False
# É a função que efetivamente trata a resposta do usuário quando você perguntou “qual pedido quer fazer primeiro?”.
async def try_resolve_pending_order_selection(self, message: str, user_id: int | None) -> str | None:
context = self.service._get_user_context(user_id)
if not context:
return None
pending = context.get("pending_order_selection")
if not isinstance(pending, dict):
return None
if pending.get("expires_at") and pending["expires_at"] < datetime.utcnow():
context["pending_order_selection"] = None
return None
orders = pending.get("orders") or []
if len(orders) < 2:
context["pending_order_selection"] = None
return None
if self.is_order_selection_reset_message(message):
self.service._clear_user_conversation_state(user_id=user_id)
cleaned_message = self.remove_order_selection_reset_prefix(message)
if not cleaned_message:
return "Tudo bem. Limpei o contexto atual. Pode me dizer o que voce quer fazer agora?"
return await self.service.handle_message(cleaned_message, user_id=user_id)
selected_index, auto_selected = self.detect_selected_order_index(message=message, orders=orders)
if selected_index is None:
if self.looks_like_fresh_operational_request(message):
context["pending_order_selection"] = None
return None
return self.render_order_selection_prompt(orders)
selected_order = orders[selected_index]
remaining_order = orders[1 - selected_index]
context["pending_order_selection"] = None
self.queue_order_with_memory_seed(
user_id=user_id,
domain=remaining_order["domain"],
order_message=remaining_order["message"],
memory_seed=remaining_order.get("memory_seed"),
)
intro = (
f"Vou escolher e comecar por: {self.describe_order_selection_option(selected_order)}"
if auto_selected
else f"Perfeito. Vou comecar por: {self.describe_order_selection_option(selected_order)}"
)
selected_memory = dict(selected_order.get("memory_seed") or {})
if selected_memory:
context["generic_memory"] = selected_memory
next_response = await self.service.handle_message(str(selected_order.get("message") or ""), user_id=user_id)
return f"{intro}\n{next_response}"
# Cria o aviso de fila.
def render_queue_notice(self, queued_count: int) -> str | None:
if queued_count <= 0:
return None
if queued_count == 1:
return "Anotei mais 1 pedido e sigo nele quando voce disser 'continuar'."
return f"Anotei mais {queued_count} pedidos e sigo neles conforme voce for dizendo 'continuar'."
# Mostra ao usuário o que falta concluir no fluxo atual antes de mudar de assunto.
def render_open_flow_prompt(self, user_id: int | None, domain: str) -> str:
if domain == "review" and user_id is not None:
draft = self.service.state.get_entry("pending_review_drafts", user_id, expire=True)
if draft:
missing = [field for field in REVIEW_REQUIRED_FIELDS if field not in draft.get("payload", {})]
if missing:
return self.service._render_missing_review_fields_prompt(missing)
management_draft = self.service.state.get_entry("pending_review_management_drafts", user_id, expire=True)
if management_draft:
action = management_draft.get("action", "cancel")
payload = management_draft.get("payload", {})
if action == "reschedule":
missing = [field for field in ("protocolo", "nova_data_hora") if field not in payload]
if missing:
return self.service._render_missing_review_reschedule_fields_prompt(missing)
else:
missing = [field for field in ("protocolo",) if field not in payload]
if missing:
return self.service._render_missing_review_cancel_fields_prompt(missing)
if self.service.state.get_entry("pending_review_confirmations", user_id, expire=True):
return "Antes de mudar de assunto, me confirme se podemos concluir seu agendamento de revisao."
if self.service.state.get_entry("pending_review_reuse_confirmations", user_id, expire=True):
return self.service._render_review_reuse_question()
if domain == "sales" and user_id is not None:
draft = self.service.state.get_entry("pending_order_drafts", user_id, expire=True)
if draft:
missing = [field for field in ORDER_REQUIRED_FIELDS if field not in draft.get("payload", {})]
if missing:
return self.service._render_missing_order_fields_prompt(missing)
cancel_draft = self.service.state.get_entry("pending_cancel_order_drafts", user_id, expire=True)
if cancel_draft:
missing = [field for field in CANCEL_ORDER_REQUIRED_FIELDS if field not in cancel_draft.get("payload", {})]
if missing:
return self.service._render_missing_cancel_order_fields_prompt(missing)
return "Vamos concluir este assunto primeiro e ja sigo com o proximo em seguida."
# Cria a introdução para mudar de assunto de forma natural.
def build_next_order_transition(self, domain: str) -> str:
if domain == "sales":
return "Agora, sobre a compra do veiculo:"
if domain == "review":
return "Agora, sobre o agendamento da revisao:"
return "Agora, sobre o proximo assunto:"
# Quando um fluxo termina, ela prepara a passagem para o próximo pedido da fila.
async def maybe_auto_advance_next_order(self, base_response: str, user_id: int | None) -> str:
context = self.service._get_user_context(user_id)
if not context:
return base_response
if context.get("pending_switch"):
return base_response
active_domain = context.get("active_domain", "general")
if self.has_open_flow(user_id=user_id, domain=active_domain):
return base_response
next_order = self.pop_next_order(user_id=user_id)
if not next_order:
return base_response
context["pending_switch"] = {
"source_domain": context.get("active_domain", "general"),
"target_domain": next_order["domain"],
"queued_message": next_order["message"],
"memory_seed": dict(next_order.get("memory_seed") or self.service._new_tab_memory(user_id=user_id)),
"expires_at": datetime.utcnow() + timedelta(minutes=15),
}
transition = self.build_next_order_transition(next_order["domain"])
return (
f"{base_response}\n\n"
f"{transition}\n"
"Tenho um proximo pedido na fila. Quando quiser, diga 'continuar' para eu seguir nele."
)
# Converte intenções em um domínio principal de atendimento.
def domain_from_intents(self, intents: dict | None) -> str:
normalized = self.service.normalizer.normalize_intents(intents)
review_score = (
int(normalized.get("review_schedule", False))
+ int(normalized.get("review_list", False))
+ int(normalized.get("review_cancel", False))
+ int(normalized.get("review_reschedule", False))
)
sales_score = int(normalized.get("order_create", False)) + int(normalized.get("order_cancel", False))
if review_score > sales_score and review_score > 0:
return "review"
if sales_score > review_score and sales_score > 0:
return "sales"
return "general"
# Detecta comandos de continuação.
def is_context_switch_confirmation(self, message: str) -> bool:
return self.service._is_affirmative_message(message) or self.service._is_negative_message(message)
# Executa o próximo pedido da fila quando o usuário disser “continuar”.
def is_continue_queue_message(self, message: str) -> bool:
normalized = self.service.normalizer.normalize_text(message).strip()
normalized = re.sub(r"[.!?,;:]+$", "", normalized)
return normalized in {"continuar", "pode continuar", "seguir", "pode seguir", "proximo", "segue"}
# Executa o próximo pedido da fila quando o usuário disser “continuar”.
async def try_continue_queued_order(self, message: str, user_id: int | None) -> str | None:
context = self.service._get_user_context(user_id)
if not context:
return None
pending_switch = context.get("pending_switch")
if not isinstance(pending_switch, dict):
return None
if pending_switch.get("expires_at") and pending_switch["expires_at"] < datetime.utcnow():
context["pending_switch"] = None
return None
queued_message = str(pending_switch.get("queued_message") or "").strip()
if not queued_message:
return None
if self.service._is_negative_message(message):
context["pending_switch"] = None
return "Tudo bem. Mantive o proximo pedido fora da fila por enquanto."
if not (self.is_continue_queue_message(message) or self.service._is_affirmative_message(message)):
return None
target_domain = str(pending_switch.get("target_domain") or "general")
memory_seed = dict(pending_switch.get("memory_seed") or {})
self.apply_domain_switch(user_id=user_id, target_domain=target_domain)
refreshed = self.service._get_user_context(user_id)
if refreshed is not None:
refreshed["generic_memory"] = memory_seed
transition = self.build_next_order_transition(target_domain)
next_response = await self.service.handle_message(queued_message, user_id=user_id)
return f"{transition}\n{next_response}"
# Diz se ainda existe algo pendente antes de encerrar aquele assunto.
def has_open_flow(self, user_id: int | None, domain: str) -> bool:
if user_id is None:
return False
if domain == "review":
return bool(
self.service.state.get_entry("pending_review_drafts", user_id, expire=True)
or self.service.state.get_entry("pending_review_confirmations", user_id, expire=True)
or self.service.state.get_entry("pending_review_management_drafts", user_id, expire=True)
or self.service.state.get_entry("pending_review_reuse_confirmations", user_id, expire=True)
)
if domain == "sales":
return bool(
self.service.state.get_entry("pending_order_drafts", user_id, expire=True)
or self.service.state.get_entry("pending_cancel_order_drafts", user_id, expire=True)
)
return False
# Encerra o contexto anterior e troca oficialmente para o novo assunto.
def apply_domain_switch(self, user_id: int | None, target_domain: str) -> None:
context = self.service._get_user_context(user_id)
if not context:
return
previous_domain = context.get("active_domain", "general")
if previous_domain == "review":
self.service._reset_pending_review_states(user_id=user_id)
if previous_domain == "sales":
self.service._reset_pending_order_states(user_id=user_id)
context["active_domain"] = target_domain
context["generic_memory"] = self.service._new_tab_memory(user_id=user_id)
context["pending_order_selection"] = None
context["pending_switch"] = None
# Controla a confirmação de “você quer mesmo sair deste assunto e ir para outro?”.
def handle_context_switch(self, message: str, user_id: int | None, target_domain_hint: str = "general") -> str | None:
context = self.service._get_user_context(user_id)
if not context:
return None
pending_switch = context.get("pending_switch")
if pending_switch:
if pending_switch["expires_at"] < datetime.utcnow():
context["pending_switch"] = None
elif self.is_context_switch_confirmation(message):
if self.service._is_affirmative_message(message):
target_domain = pending_switch["target_domain"]
self.apply_domain_switch(user_id=user_id, target_domain=target_domain)
return self.render_context_switched_message(target_domain=target_domain)
context["pending_switch"] = None
return "Perfeito, vamos continuar no fluxo atual."
pending_order_selection = context.get("pending_order_selection")
if pending_order_selection and pending_order_selection.get("expires_at") < datetime.utcnow():
context["pending_order_selection"] = None
current_domain = context.get("active_domain", "general")
if target_domain_hint == "general" or target_domain_hint == current_domain:
return None
if not self.has_open_flow(user_id=user_id, domain=current_domain):
return None
context["pending_switch"] = {
"source_domain": current_domain,
"target_domain": target_domain_hint,
"expires_at": datetime.utcnow() + timedelta(minutes=15),
}
return self.render_context_switch_confirmation(source_domain=current_domain, target_domain=target_domain_hint)
# Marca qual domínio está ativo atualmente.
def update_active_domain(self, user_id: int | None, domain_hint: str = "general") -> None:
context = self.service._get_user_context(user_id)
if context and domain_hint != "general":
context["active_domain"] = domain_hint
# Serve para exibir o nome do domínio em mensagens para o usuário.
def domain_label(self, domain: str) -> str:
labels = {
"review": "agendamento de revisao",
"sales": "compra de veiculo",
"general": "atendimento geral",
}
return labels.get(domain, "atendimento")
# É o prompt de confirmação da troca.
def render_context_switch_confirmation(self, source_domain: str, target_domain: str) -> str:
return (
f"Entendi que voce quer sair de {self.domain_label(source_domain)} "
f"e ir para {self.domain_label(target_domain)}. Tem certeza?"
)
#Mensagem exibida após a troca acontecer.
def render_context_switched_message(self, target_domain: str) -> str:
return f"Certo, contexto anterior encerrado. Vamos seguir com {self.domain_label(target_domain)}."
# Serve para depuração, observabilidade ou até para alimentar outro componente com um resumo do estado atual.
def build_context_summary(self, user_id: int | None) -> str:
context = self.service._get_user_context(user_id)
if not context:
return "Contexto de conversa: sem contexto ativo."
summary = [f"Contexto de conversa ativo: {self.domain_label(context.get('active_domain', 'general'))}."]
memory = context.get("generic_memory", {})
if memory:
summary.append(f"Memoria generica temporaria: {memory}.")
order_queue = context.get("order_queue", [])
if order_queue:
summary.append(f"Fila de pedidos pendentes: {len(order_queue)}.")
return " ".join(summary)

@ -0,0 +1,418 @@
import json
import logging
import re
import unicodedata
from datetime import datetime, timedelta
logger = logging.getLogger(__name__)
class EntityNormalizer:
def empty_extraction_payload(self) -> dict:
return {
"generic_memory": {},
"review_fields": {},
"review_management_fields": {},
"order_fields": {},
"cancel_order_fields": {},
"intents": {},
}
def empty_message_plan(self, message: str) -> dict:
return {
"orders": [
{
"domain": "general",
"message": (message or "").strip(),
"entities": self.empty_extraction_payload(),
}
]
}
def coerce_message_plan(self, payload, message: str) -> dict:
default = self.empty_message_plan(message=message)
if not isinstance(payload, dict):
return default
raw_orders = payload.get("orders")
if not isinstance(raw_orders, list):
return default
normalized_orders: list[dict] = []
for item in raw_orders:
if not isinstance(item, dict):
continue
domain = str(item.get("domain") or "general").strip().lower()
if domain not in {"review", "sales", "general"}:
domain = "general"
segment = str(item.get("message") or "").strip()
if not segment:
continue
normalized_orders.append(
{
"domain": domain,
"message": segment,
"entities": self.coerce_extraction_contract(item.get("entities")),
}
)
if not normalized_orders:
return default
return {"orders": normalized_orders}
def coerce_extraction_contract(self, payload) -> dict:
if not isinstance(payload, dict):
return self.empty_extraction_payload()
contract = self.empty_extraction_payload()
for key in contract:
value = payload.get(key)
contract[key] = value if isinstance(value, dict) else {}
if key not in payload:
logger.info("Extracao sem secao '%s'; usando vazio.", key)
return contract
def parse_json_object(self, text: str):
candidate = (text or "").strip()
if not candidate:
return None
if candidate.startswith("```"):
candidate = re.sub(r"^```(?:json)?\s*", "", candidate, flags=re.IGNORECASE)
candidate = re.sub(r"\s*```$", "", candidate)
try:
return json.loads(candidate)
except json.JSONDecodeError:
match = re.search(r"\{.*\}", candidate, flags=re.DOTALL)
if not match:
logger.warning("Extracao sem JSON valido no texto retornado.")
return None
try:
return json.loads(match.group(0))
except json.JSONDecodeError:
logger.warning("Extracao com JSON invalido apos recorte.")
return None
def normalize_text(self, text: str) -> str:
normalized = unicodedata.normalize("NFKD", text or "")
ascii_text = normalized.encode("ascii", "ignore").decode("ascii")
return ascii_text.lower()
def normalize_plate(self, value) -> str | None:
text = str(value or "").strip().upper()
if not text:
return None
if re.fullmatch(r"[A-Z]{3}[0-9][A-Z0-9][0-9]{2}", text) or re.fullmatch(r"[A-Z]{3}[0-9]{4}", text):
return text
compact = re.sub(r"[^A-Z0-9]", "", text)
if re.fullmatch(r"[A-Z]{3}[0-9][A-Z0-9][0-9]{2}", compact) or re.fullmatch(r"[A-Z]{3}[0-9]{4}", compact):
return compact
return None
def normalize_cpf(self, value) -> str | None:
digits = re.sub(r"\D", "", str(value or ""))
if len(digits) == 11:
return digits
return None
def normalize_positive_number(self, value) -> float | None:
if value is None:
return None
if isinstance(value, (int, float)):
number = float(value)
return number if number > 0 else None
text = self.normalize_text(str(value))
text = text.replace("r$", "").strip()
multiplier = 1000 if "mil" in text else 1
text = text.replace("mil", "").strip()
digits = re.sub(r"[^0-9,.\s]", "", text)
if not digits:
return None
numeric = digits.replace(".", "").replace(" ", "").replace(",", ".")
try:
number = float(numeric) * multiplier
return number if number > 0 else None
except ValueError:
return None
def normalize_vehicle_profile(self, value) -> list[str]:
if value is None:
return []
allowed = {"suv", "sedan", "hatch", "pickup"}
items = value if isinstance(value, list) else [value]
normalized: list[str] = []
for item in items:
marker = self.normalize_text(str(item)).strip()
if marker in allowed and marker not in normalized:
normalized.append(marker)
return normalized
def normalize_bool(self, value) -> bool | None:
if isinstance(value, bool):
return value
lowered = self.normalize_text(str(value or "")).strip()
if lowered in {"sim", "true", "1", "yes"}:
return True
if lowered in {"nao", "false", "0", "no"}:
return False
return None
def normalize_datetime_connector(self, text: str) -> str:
compact = " ".join(str(text or "").strip().split())
lowered = compact.lower()
marker = " as "
if marker in lowered:
index = lowered.index(marker)
return f"{compact[:index]} {compact[index + len(marker):]}".strip()
return compact
def try_parse_iso_datetime(self, text: str) -> datetime | None:
candidate = str(text or "").strip()
if not candidate:
return None
try:
return datetime.fromisoformat(candidate.replace("Z", "+00:00"))
except ValueError:
return None
def try_parse_datetime_with_formats(self, text: str, formats: tuple[str, ...]) -> datetime | None:
candidate = str(text or "").strip()
if not candidate:
return None
for fmt in formats:
try:
return datetime.strptime(candidate, fmt)
except ValueError:
continue
return None
def try_parse_review_absolute_datetime(self, text: str) -> datetime | None:
normalized = self.normalize_datetime_connector(text)
parsed = self.try_parse_iso_datetime(normalized)
if parsed is not None:
return parsed
day_first_formats = (
"%d/%m/%Y %H:%M",
"%d/%m/%Y %H:%M:%S",
"%d-%m-%Y %H:%M",
"%d-%m-%Y %H:%M:%S",
)
year_first_formats = (
"%Y/%m/%d %H:%M",
"%Y/%m/%d %H:%M:%S",
"%Y-%m-%d %H:%M",
"%Y-%m-%d %H:%M:%S",
)
return self.try_parse_datetime_with_formats(normalized, day_first_formats + year_first_formats)
def strip_token_edges(self, token: str) -> str:
cleaned = str(token or "").strip()
edge_chars = "[](){}<>,.;:!?\"'`"
while cleaned and cleaned[0] in edge_chars:
cleaned = cleaned[1:]
while cleaned and cleaned[-1] in edge_chars:
cleaned = cleaned[:-1]
return cleaned
def extract_hhmm_from_text(self, text: str) -> str | None:
cleaned = self.normalize_datetime_connector(text)
for token in cleaned.split():
normalized_token = self.strip_token_edges(token)
parts = normalized_token.split(":")
if len(parts) not in {2, 3}:
continue
if not all(part.isdigit() for part in parts):
continue
hour = int(parts[0])
minute = int(parts[1])
if 0 <= hour <= 23 and 0 <= minute <= 59:
return f"{hour:02d}:{minute:02d}"
return None
def normalize_review_datetime_text(self, value) -> str | None:
text = str(value or "").strip()
if not text:
return None
absolute_dt = self.try_parse_review_absolute_datetime(text)
if absolute_dt is not None:
return text
normalized = self.normalize_text(text)
day_offset = None
if "amanha" in normalized:
day_offset = 1
elif "hoje" in normalized:
day_offset = 0
if day_offset is None:
return text
time_text = self.extract_hhmm_from_text(normalized)
if not time_text:
return text
hour_text, minute_text = time_text.split(":")
target_date = datetime.now() + timedelta(days=day_offset)
return f"{target_date.strftime('%d/%m/%Y')} {int(hour_text):02d}:{int(minute_text):02d}"
def normalize_generic_fields(self, data) -> dict:
if not isinstance(data, dict):
return {}
extracted: dict = {}
plate = self.normalize_plate(data.get("placa"))
if plate:
extracted["placa"] = plate
cpf = self.normalize_cpf(data.get("cpf"))
if cpf:
extracted["cpf"] = cpf
budget = self.normalize_positive_number(data.get("orcamento_max"))
if budget:
extracted["orcamento_max"] = int(round(budget))
profile = self.normalize_vehicle_profile(data.get("perfil_veiculo"))
if profile:
extracted["perfil_veiculo"] = profile
return extracted
def normalize_review_fields(self, data) -> dict:
if not isinstance(data, dict):
return {}
extracted: dict = {}
plate = self.normalize_plate(data.get("placa"))
if plate:
extracted["placa"] = plate
date_time = self.normalize_review_datetime_text(data.get("data_hora"))
if date_time:
extracted["data_hora"] = date_time
model = str(data.get("modelo") or "").strip(" ,.;")
if model:
extracted["modelo"] = model.title()
year = self.normalize_positive_number(data.get("ano"))
if year:
year_int = int(round(year))
if 1900 <= year_int <= 2100:
extracted["ano"] = year_int
km = self.normalize_positive_number(data.get("km"))
if km:
extracted["km"] = int(round(km))
reviewed = self.normalize_bool(data.get("revisao_previa_concessionaria"))
if reviewed is not None:
extracted["revisao_previa_concessionaria"] = reviewed
return extracted
def tokenize_text(self, text: str) -> list[str]:
return [token for token in str(text or "").split() if token]
def clean_protocol_token(self, token: str) -> str:
cleaned = str(token or "").strip().upper()
edge_chars = "[](){}<>,.;:!?\"'`"
while cleaned and cleaned[0] in edge_chars:
cleaned = cleaned[1:]
while cleaned and cleaned[-1] in edge_chars:
cleaned = cleaned[:-1]
return cleaned
def is_valid_protocol_suffix(self, value: str) -> bool:
if not value or len(value) < 4:
return False
return all(char.isalnum() for char in value)
def normalize_review_protocol(self, value: str) -> str | None:
candidate = self.clean_protocol_token(value)
if not candidate.startswith("REV-"):
return None
parts = candidate.split("-")
if len(parts) != 3:
return None
prefix, date_part, suffix_part = parts
if prefix != "REV":
return None
if len(date_part) != 8 or not date_part.isdigit():
return None
try:
datetime.strptime(date_part, "%Y%m%d")
except ValueError:
return None
if not self.is_valid_protocol_suffix(suffix_part):
return None
return f"{prefix}-{date_part}-{suffix_part}"
def extract_review_protocol_from_text(self, text: str) -> str | None:
for token in self.tokenize_text(text):
normalized = self.normalize_review_protocol(token)
if normalized:
return normalized
return self.normalize_review_protocol(str(text or ""))
def normalize_review_management_fields(self, data) -> dict:
if not isinstance(data, dict):
return {}
extracted: dict = {}
raw_protocol = data.get("protocolo") or data.get("numero_protocolo") or data.get("codigo")
protocol = self.extract_review_protocol_from_text(str(raw_protocol or ""))
if protocol:
extracted["protocolo"] = protocol
new_datetime = self.normalize_review_datetime_text(data.get("nova_data_hora"))
if new_datetime:
extracted["nova_data_hora"] = new_datetime
reason = str(data.get("motivo") or "").strip(" .;")
if reason:
extracted["motivo"] = reason
return extracted
def normalize_order_fields(self, data) -> dict:
if not isinstance(data, dict):
return {}
extracted: dict = {}
cpf = self.normalize_cpf(data.get("cpf"))
if cpf:
extracted["cpf"] = cpf
value = self.normalize_positive_number(data.get("valor_veiculo"))
if value:
extracted["valor_veiculo"] = round(value, 2)
return extracted
def normalize_cancel_order_fields(self, data) -> dict:
if not isinstance(data, dict):
return {}
extracted: dict = {}
order_number = str(data.get("numero_pedido") or "").strip().upper()
if order_number and re.fullmatch(r"PED-[A-Z0-9\\-]+", order_number):
extracted["numero_pedido"] = order_number
reason = str(data.get("motivo") or "").strip(" .;")
if reason:
extracted["motivo"] = reason
return extracted
def normalize_intents(self, data) -> dict:
if not isinstance(data, dict):
data = {}
return {
"review_schedule": bool(self.normalize_bool(data.get("review_schedule"))),
"review_list": bool(self.normalize_bool(data.get("review_list"))),
"review_cancel": bool(self.normalize_bool(data.get("review_cancel"))),
"review_reschedule": bool(self.normalize_bool(data.get("review_reschedule"))),
"order_create": bool(self.normalize_bool(data.get("order_create"))),
"order_cancel": bool(self.normalize_bool(data.get("order_cancel"))),
}
def has_useful_extraction(self, extraction: dict | None) -> bool:
if not isinstance(extraction, dict):
return False
intents = self.normalize_intents(extraction.get("intents"))
if any(intents.values()):
return True
return any(
bool(extraction.get(key))
for key in ("generic_memory", "review_fields", "review_management_fields", "order_fields", "cancel_order_fields")
)
def has_operational_intent(self, extracted_entities: dict | None) -> bool:
if not isinstance(extracted_entities, dict):
return False
intents = self.normalize_intents(extracted_entities.get("intents"))
if any(intents.values()):
return True
return any(
bool(extracted_entities.get(key))
for key in ("review_fields", "review_management_fields", "order_fields", "cancel_order_fields")
)

@ -0,0 +1,160 @@
import logging
from app.services.ai.llm_service import LLMService
from app.services.orchestration.entity_normalizer import EntityNormalizer
logger = logging.getLogger(__name__)
class MessagePlanner:
def __init__(self, llm: LLMService, normalizer: EntityNormalizer):
self.llm = llm
self.normalizer = normalizer
async def extract_message_plan(self, message: str, user_id: int | None) -> dict:
prompt = (
"Analise a mensagem e retorne APENAS JSON valido com roteamento e entidades por pedido.\n"
"Sem markdown e sem texto extra.\n\n"
"Formato:\n"
"{\n"
' "orders": [\n'
" {\n"
' "domain": "review|sales|general",\n'
' "message": "trecho literal do pedido",\n'
' "entities": {\n'
' "generic_memory": {"placa": null, "cpf": null, "orcamento_max": null, "perfil_veiculo": []},\n'
' "review_fields": {"placa": null, "data_hora": null, "modelo": null, "ano": null, "km": null, "revisao_previa_concessionaria": null},\n'
' "review_management_fields": {"protocolo": null, "nova_data_hora": null, "motivo": null},\n'
' "order_fields": {"cpf": null, "valor_veiculo": null},\n'
' "cancel_order_fields": {"numero_pedido": null, "motivo": null},\n'
' "intents": {"review_schedule": false, "review_list": false, "review_cancel": false, "review_reschedule": false, "order_create": false, "order_cancel": false}\n'
" }\n"
" }\n"
" ]\n"
"}\n\n"
"Regras:\n"
"- Se houver mais de um pedido operacional, separe em itens distintos em ordem de aparicao.\n"
"- Se nao houver pedido operacional, use domain='general' com a mensagem inteira.\n"
"- Mantenha cada message curta e fiel ao texto do usuario.\n\n"
f"Contexto: user_id={user_id if user_id is not None else 'anonimo'}\n"
f"Mensagem do usuario: {message}"
)
default = self.normalizer.empty_message_plan(message=message)
try:
result = await self.llm.generate_response(message=prompt, tools=[])
text = (result.get("response") or "").strip()
payload = self.normalizer.parse_json_object(text)
if not isinstance(payload, dict):
logger.warning("Plano de mensagem invalido (nao JSON objeto). user_id=%s", user_id)
return default
return self.normalizer.coerce_message_plan(payload=payload, message=message)
except Exception:
logger.exception("Falha ao extrair plano da mensagem com LLM. user_id=%s", user_id)
return default
async def extract_routing(self, message: str, user_id: int | None) -> dict:
plan = await self.extract_message_plan(message=message, user_id=user_id)
return {
"orders": [
{
"domain": item.get("domain", "general"),
"message": item.get("message", ""),
}
for item in plan.get("orders", [])
]
}
async def extract_entities(self, message: str, user_id: int | None) -> dict:
user_context = f"user_id={user_id}" if user_id is not None else "user_id=anonimo"
prompt = (
"Extraia entidades da mensagem do usuario e retorne APENAS JSON valido.\n"
"Nao use markdown, nao adicione texto antes/depois, nao invente dados ausentes.\n"
"Se nao houver valor, use null ou lista vazia.\n\n"
"Formato obrigatorio:\n"
"{\n"
' "generic_memory": {\n'
' "placa": null,\n'
' "cpf": null,\n'
' "orcamento_max": null,\n'
' "perfil_veiculo": []\n'
" },\n"
' "review_fields": {\n'
' "placa": null,\n'
' "data_hora": null,\n'
' "modelo": null,\n'
' "ano": null,\n'
' "km": null,\n'
' "revisao_previa_concessionaria": null\n'
" },\n"
' "review_management_fields": {\n'
' "protocolo": null,\n'
' "nova_data_hora": null,\n'
' "motivo": null\n'
" },\n"
' "order_fields": {\n'
' "cpf": null,\n'
' "valor_veiculo": null\n'
" },\n"
' "cancel_order_fields": {\n'
' "numero_pedido": null,\n'
' "motivo": null\n'
" },\n"
' "intents": {\n'
' "review_schedule": false,\n'
' "review_list": false,\n'
' "review_cancel": false,\n'
' "review_reschedule": false,\n'
' "order_create": false,\n'
' "order_cancel": false\n'
" }\n"
"}\n\n"
f"Contexto: {user_context}\n"
f"Mensagem do usuario: {message}"
)
default = self.normalizer.empty_extraction_payload()
try:
result = await self.llm.generate_response(message=prompt, tools=[])
text = (result.get("response") or "").strip()
if not text:
logger.warning("Extracao vazia do LLM. user_id=%s", user_id)
return default
payload = self.normalizer.parse_json_object(text)
if not isinstance(payload, dict):
logger.warning("Extracao invalida (nao JSON objeto). user_id=%s", user_id)
return default
return self.normalize_extraction_payload(payload)
except Exception:
logger.exception("Falha ao extrair entidades com LLM. user_id=%s", user_id)
return default
def resolve_entities_for_message_plan(self, message_plan: dict, routed_message: str) -> dict:
default = self.normalizer.empty_extraction_payload()
if not isinstance(message_plan, dict):
return default
target = (routed_message or "").strip()
raw_orders = message_plan.get("orders")
if not isinstance(raw_orders, list):
return default
for item in raw_orders:
if not isinstance(item, dict):
continue
segment = str(item.get("message") or "").strip()
if segment != target:
continue
return self.normalize_extraction_payload(item.get("entities"))
return default
def normalize_extraction_payload(self, payload) -> dict:
coerced = self.normalizer.coerce_extraction_contract(payload)
return {
"generic_memory": self.normalizer.normalize_generic_fields(coerced.get("generic_memory")),
"review_fields": self.normalizer.normalize_review_fields(coerced.get("review_fields")),
"review_management_fields": self.normalizer.normalize_review_management_fields(coerced.get("review_management_fields")),
"order_fields": self.normalizer.normalize_order_fields(coerced.get("order_fields")),
"cancel_order_fields": self.normalizer.normalize_cancel_order_fields(coerced.get("cancel_order_fields")),
"intents": self.normalizer.normalize_intents(coerced.get("intents")),
}

File diff suppressed because it is too large Load Diff

@ -0,0 +1,28 @@
from fastapi import HTTPException
from app.services.orchestration.orchestrator_config import DETERMINISTIC_RESPONSE_TOOLS
from app.services.orchestration.response_formatter import fallback_format_tool_result
class ToolExecutor:
def __init__(self, registry):
self.registry = registry
async def execute(self, tool_name: str, arguments: dict, user_id: int | None = None):
return await self.registry.execute(tool_name, arguments, user_id=user_id)
def should_use_deterministic_response(self, tool_name: str) -> bool:
return tool_name in DETERMINISTIC_RESPONSE_TOOLS
def http_exception_detail(self, exc: HTTPException) -> str:
detail = exc.detail
if isinstance(detail, str):
return detail
if isinstance(detail, dict):
message = str(detail.get("message") or "").strip()
if message:
return message
return "Nao foi possivel concluir a operacao solicitada."
def fallback_format_tool_result(self, tool_name: str, tool_result) -> str:
return fallback_format_tool_result(tool_name=tool_name, tool_result=tool_result)
Loading…
Cancel
Save