🧠 feat(orquestrador): deixar o modelo decidir o turno e limitar regex a formalizacao tecnica

Introduz o contrato TurnDecision e a extracao estruturada por turno no planner para que intent, domain, action, selecao e resposta venham do modelo, com validacao Pydantic e fallback previsivel quando o JSON vier invalido.

Tambem extrai a normalizacao tecnica para um modulo dedicado e passa a usar regex apenas para formalizar CPF, placa, protocolos, datas e outros formatos estruturados, reduzindo heuristicas semanticas dentro do normalizador, da policy e dos fluxos de revisao.
main
parent d27ebf798d
commit 8cf79174ee

@ -10,22 +10,28 @@ from app.services.orchestration.orchestrator_config import (
)
# Esse mixin concentra os fluxos incrementais de revisao e pos-venda.
class ReviewFlowMixin:
def _decision_intent(self, turn_decision: dict | None) -> str:
return str((turn_decision or {}).get("intent") or "").strip().lower()
async def _try_handle_review_management(
self,
message: str,
user_id: int | None,
extracted_fields: dict | None = None,
intents: dict | None = None,
turn_decision: dict | None = None,
) -> str | None:
if user_id is None:
return None
normalized_intents = self._normalize_intents(intents)
draft = self.state.get_entry("pending_review_management_drafts", user_id, expire=True)
decision_intent = self._decision_intent(turn_decision)
has_list_intent = normalized_intents.get("review_list", False)
has_cancel_intent = normalized_intents.get("review_cancel", False)
has_reschedule_intent = normalized_intents.get("review_reschedule", False)
has_list_intent = decision_intent == "review_list" or normalized_intents.get("review_list", False)
has_cancel_intent = decision_intent == "review_cancel" or normalized_intents.get("review_cancel", False)
has_reschedule_intent = decision_intent == "review_reschedule" or normalized_intents.get("review_reschedule", False)
if has_list_intent:
self._reset_pending_review_states(user_id=user_id)
@ -147,6 +153,8 @@ class ReviewFlowMixin:
def _store_last_review_package(self, user_id: int | None, payload: dict | None) -> None:
if user_id is None or not isinstance(payload, dict):
return
# Guarda um pacote reutilizavel do ultimo veiculo informado
# para reduzir repeticao em novos agendamentos.
package = {
"placa": payload.get("placa"),
"modelo": payload.get("modelo"),
@ -182,14 +190,17 @@ class ReviewFlowMixin:
user_id: int | None,
extracted_fields: dict | None = None,
intents: dict | None = None,
turn_decision: dict | None = None,
) -> str | None:
if user_id is None:
return None
normalized_intents = self._normalize_intents(intents)
has_intent = normalized_intents.get("review_schedule", False)
decision_intent = self._decision_intent(turn_decision)
has_intent = decision_intent == "review_schedule" or normalized_intents.get("review_schedule", False)
has_management_intent = (
normalized_intents.get("review_list", False)
decision_intent in {"review_list", "review_cancel", "review_reschedule"}
or normalized_intents.get("review_list", False)
or normalized_intents.get("review_cancel", False)
or normalized_intents.get("review_reschedule", False)
)
@ -245,7 +256,8 @@ class ReviewFlowMixin:
draft
and not has_intent
and (
normalized_intents.get("order_create", False)
decision_intent in {"order_create", "order_cancel"}
or normalized_intents.get("order_create", False)
or normalized_intents.get("order_cancel", False)
)
and not extracted
@ -257,6 +269,8 @@ class ReviewFlowMixin:
return None
if draft is None:
# Cria um draft com TTL para permitir coleta do agendamento
# em varias mensagens sem perder o progresso.
draft = {
"payload": {},
"expires_at": datetime.utcnow() + timedelta(minutes=PENDING_REVIEW_DRAFT_TTL_MINUTES),

@ -17,6 +17,19 @@ class ConversationPolicy:
def __init__(self, service: "OrquestradorService"):
self.service = service
def _decision_action(self, turn_decision: dict | None) -> str:
return str((turn_decision or {}).get("action") or "").strip().lower()
def _decision_intent(self, turn_decision: dict | None) -> str:
return str((turn_decision or {}).get("intent") or "").strip().lower()
def _decision_domain(self, turn_decision: dict | None) -> str:
return str((turn_decision or {}).get("domain") or "").strip().lower()
def _decision_selection_index(self, turn_decision: dict | None) -> int | None:
value = (turn_decision or {}).get("selection_index")
return value if isinstance(value, int) and value >= 0 else None
# Essa função serve para reaproveitar informações já informadas antes, evitando pedir novamente ao usuário.
def try_prefill_review_fields_from_memory(self, user_id: int | None, payload: dict) -> None:
if user_id is None:
@ -289,7 +302,11 @@ class ConversationPolicy:
# Ajuda a perceber quando o usuário talvez tenha mudado de assunto sem responder à pergunta de escolha
def looks_like_fresh_operational_request(self, message: str) -> bool:
def looks_like_fresh_operational_request(self, message: str, turn_decision: dict | None = None) -> bool:
decision_domain = self._decision_domain(turn_decision)
decision_intent = self._decision_intent(turn_decision)
if decision_domain in {"review", "sales"} or decision_intent not in {"", "general"}:
return True
normalized = self.service.normalizer.normalize_text(message).strip()
if len(normalized) < 15:
return False
@ -360,7 +377,16 @@ class ConversationPolicy:
# Interpreta a resposta do usuário na etapa de seleção.
def detect_selected_order_index(self, message: str, orders: list[dict]) -> tuple[int | None, bool]:
def detect_selected_order_index(
self,
message: str,
orders: list[dict],
turn_decision: dict | None = None,
) -> tuple[int | None, bool]:
selection_index = self._decision_selection_index(turn_decision)
if selection_index is not None and 0 <= selection_index < len(orders):
return selection_index, False
normalized = self.strip_choice_message(self.service.normalizer.normalize_text(message))
indifferent_tokens = {
"tanto faz",
@ -378,6 +404,12 @@ class ConversationPolicy:
if normalized in {"2", "segundo", "segunda", "opcao 2", "acao 2", "pedido 2"}:
return 1, False
decision_domain = self._decision_domain(turn_decision)
if len(orders) >= 2 and decision_domain in {"review", "sales"}:
matches = [index for index, order in enumerate(orders) if order.get("domain") == decision_domain]
if len(matches) == 1:
return matches[0], False
review_matches = [index for index, order in enumerate(orders) if order.get("domain") == "review"]
sales_matches = [index for index, order in enumerate(orders) if order.get("domain") == "sales"]
has_review_signal = self.contains_any_term(normalized, {"revisao", "agendamento", "agendar", "remarcar", "pos venda"})
@ -391,7 +423,12 @@ class ConversationPolicy:
# É a função que efetivamente trata a resposta do usuário quando você perguntou “qual pedido quer fazer primeiro?”.
async def try_resolve_pending_order_selection(self, message: str, user_id: int | None) -> str | None:
async def try_resolve_pending_order_selection(
self,
message: str,
user_id: int | None,
turn_decision: dict | None = None,
) -> str | None:
context = self.service._get_user_context(user_id)
if not context:
return None
@ -406,16 +443,21 @@ class ConversationPolicy:
context["pending_order_selection"] = None
return None
if self.is_order_selection_reset_message(message):
decision_action = self._decision_action(turn_decision)
if decision_action == "clear_context" or self.is_order_selection_reset_message(message):
self.service._clear_user_conversation_state(user_id=user_id)
cleaned_message = self.remove_order_selection_reset_prefix(message)
if not cleaned_message:
return "Tudo bem. Limpei o contexto atual. Pode me dizer o que voce quer fazer agora?"
return await self.service.handle_message(cleaned_message, user_id=user_id)
selected_index, auto_selected = self.detect_selected_order_index(message=message, orders=orders)
selected_index, auto_selected = self.detect_selected_order_index(
message=message,
orders=orders,
turn_decision=turn_decision,
)
if selected_index is None:
if self.looks_like_fresh_operational_request(message):
if self.looks_like_fresh_operational_request(message, turn_decision=turn_decision):
context["pending_order_selection"] = None
return None
return self.render_order_selection_prompt(orders)
@ -548,19 +590,30 @@ class ConversationPolicy:
# Detecta comandos de continuação.
def is_context_switch_confirmation(self, message: str) -> bool:
def is_context_switch_confirmation(self, message: str, turn_decision: dict | None = None) -> bool:
if self._decision_action(turn_decision) in {"continue_queue", "cancel_active_flow", "clear_context", "discard_queue"}:
return True
if self._decision_domain(turn_decision) in {"review", "sales"}:
return True
return self.service._is_affirmative_message(message) or self.service._is_negative_message(message)
# Executa o próximo pedido da fila quando o usuário disser “continuar”.
def is_continue_queue_message(self, message: str) -> bool:
def is_continue_queue_message(self, message: str, turn_decision: dict | None = None) -> bool:
if self._decision_action(turn_decision) == "continue_queue" or self._decision_intent(turn_decision) == "queue_continue":
return True
normalized = self.service.normalizer.normalize_text(message).strip()
normalized = re.sub(r"[.!?,;:]+$", "", normalized)
return normalized in {"continuar", "pode continuar", "seguir", "pode seguir", "proximo", "segue"}
# Executa o próximo pedido da fila quando o usuário disser “continuar”.
async def try_continue_queued_order(self, message: str, user_id: int | None) -> str | None:
async def try_continue_queued_order(
self,
message: str,
user_id: int | None,
turn_decision: dict | None = None,
) -> str | None:
context = self.service._get_user_context(user_id)
if not context:
return None
@ -574,10 +627,14 @@ class ConversationPolicy:
if not queued_message:
return None
if self.service._is_negative_message(message):
decision_action = self._decision_action(turn_decision)
if self.service._is_negative_message(message) and decision_action != "continue_queue":
context["pending_switch"] = None
return "Tudo bem. Mantive o proximo pedido fora da fila por enquanto."
if not (self.is_continue_queue_message(message) or self.service._is_affirmative_message(message)):
if not (
self.is_continue_queue_message(message, turn_decision=turn_decision)
or self.service._is_affirmative_message(message)
):
return None
target_domain = str(pending_switch.get("target_domain") or "general")
@ -627,7 +684,13 @@ class ConversationPolicy:
# Controla a confirmação de “você quer mesmo sair deste assunto e ir para outro?”.
def handle_context_switch(self, message: str, user_id: int | None, target_domain_hint: str = "general") -> str | None:
def handle_context_switch(
self,
message: str,
user_id: int | None,
target_domain_hint: str = "general",
turn_decision: dict | None = None,
) -> str | None:
context = self.service._get_user_context(user_id)
if not context:
return None
@ -635,8 +698,8 @@ class ConversationPolicy:
if pending_switch:
if pending_switch["expires_at"] < datetime.utcnow():
context["pending_switch"] = None
elif self.is_context_switch_confirmation(message):
if self.service._is_affirmative_message(message):
elif self.is_context_switch_confirmation(message, turn_decision=turn_decision):
if self.service._is_affirmative_message(message) or self._decision_domain(turn_decision) == pending_switch["target_domain"]:
target_domain = pending_switch["target_domain"]
self.apply_domain_switch(user_id=user_id, target_domain=target_domain)
return self.render_context_switched_message(target_domain=target_domain)

@ -1,14 +1,22 @@
import json
import logging
import re
import unicodedata
from datetime import datetime, timedelta
from datetime import datetime
from pydantic import ValidationError
from app.services.orchestration import technical_normalizer
from app.services.orchestration.turn_decision import TurnDecision
logger = logging.getLogger(__name__)
# Essa classe concentra normalizacao tecnica e coercoes estruturadas.
# A semantica conversacional idealmente vem do modelo, nao daqui.
class EntityNormalizer:
def empty_turn_decision(self) -> dict:
return TurnDecision().model_dump()
def empty_extraction_payload(self) -> dict:
return {
"generic_memory": {},
@ -92,163 +100,66 @@ class EntityNormalizer:
logger.warning("Extracao com JSON invalido apos recorte.")
return None
def coerce_turn_decision(self, payload) -> dict:
if not isinstance(payload, dict):
return self.empty_turn_decision()
try:
model = TurnDecision.model_validate(payload)
except ValidationError:
logger.warning("Decisao de turno invalida; usando fallback estruturado.")
return self.empty_turn_decision()
normalized_entities = {
"generic_memory": self.normalize_generic_fields(model.entities.generic_memory),
"review_fields": self.normalize_review_fields(model.entities.review_fields),
"review_management_fields": self.normalize_review_management_fields(model.entities.review_management_fields),
"order_fields": self.normalize_order_fields(model.entities.order_fields),
"cancel_order_fields": self.normalize_cancel_order_fields(model.entities.cancel_order_fields),
}
dumped = model.model_dump()
dumped["entities"] = normalized_entities
dumped["tool_arguments"] = dumped.get("tool_arguments") or {}
dumped["missing_fields"] = [str(field) for field in dumped.get("missing_fields") or [] if str(field).strip()]
return dumped
def normalize_text(self, text: str) -> str:
normalized = unicodedata.normalize("NFKD", text or "")
ascii_text = normalized.encode("ascii", "ignore").decode("ascii")
return ascii_text.lower()
return technical_normalizer.normalize_text(text)
def normalize_plate(self, value) -> str | None:
text = str(value or "").strip().upper()
if not text:
return None
if re.fullmatch(r"[A-Z]{3}[0-9][A-Z0-9][0-9]{2}", text) or re.fullmatch(r"[A-Z]{3}[0-9]{4}", text):
return text
compact = re.sub(r"[^A-Z0-9]", "", text)
if re.fullmatch(r"[A-Z]{3}[0-9][A-Z0-9][0-9]{2}", compact) or re.fullmatch(r"[A-Z]{3}[0-9]{4}", compact):
return compact
return None
return technical_normalizer.normalize_plate(value)
def normalize_cpf(self, value) -> str | None:
digits = re.sub(r"\D", "", str(value or ""))
if len(digits) == 11:
return digits
return None
return technical_normalizer.normalize_cpf(value)
def normalize_positive_number(self, value) -> float | None:
if value is None:
return None
if isinstance(value, (int, float)):
number = float(value)
return number if number > 0 else None
text = self.normalize_text(str(value))
text = text.replace("r$", "").strip()
multiplier = 1000 if "mil" in text else 1
text = text.replace("mil", "").strip()
digits = re.sub(r"[^0-9,.\s]", "", text)
if not digits:
return None
numeric = digits.replace(".", "").replace(" ", "").replace(",", ".")
try:
number = float(numeric) * multiplier
return number if number > 0 else None
except ValueError:
return None
return technical_normalizer.normalize_positive_number(value)
def normalize_vehicle_profile(self, value) -> list[str]:
if value is None:
return []
allowed = {"suv", "sedan", "hatch", "pickup"}
items = value if isinstance(value, list) else [value]
normalized: list[str] = []
for item in items:
marker = self.normalize_text(str(item)).strip()
if marker in allowed and marker not in normalized:
normalized.append(marker)
return normalized
return technical_normalizer.normalize_vehicle_profile(value)
def normalize_bool(self, value) -> bool | None:
if isinstance(value, bool):
return value
lowered = self.normalize_text(str(value or "")).strip()
if lowered in {"sim", "true", "1", "yes"}:
return True
if lowered in {"nao", "false", "0", "no"}:
return False
return None
return technical_normalizer.normalize_bool(value)
def normalize_datetime_connector(self, text: str) -> str:
compact = " ".join(str(text or "").strip().split())
return re.sub(r"\s+[aàáâã]s\s+", " ", compact, flags=re.IGNORECASE).strip()
return technical_normalizer.normalize_datetime_connector(text)
def try_parse_iso_datetime(self, text: str) -> datetime | None:
candidate = str(text or "").strip()
if not candidate:
return None
try:
return datetime.fromisoformat(candidate.replace("Z", "+00:00"))
except ValueError:
return None
return technical_normalizer.try_parse_iso_datetime(text)
def try_parse_datetime_with_formats(self, text: str, formats: tuple[str, ...]) -> datetime | None:
candidate = str(text or "").strip()
if not candidate:
return None
for fmt in formats:
try:
return datetime.strptime(candidate, fmt)
except ValueError:
continue
return None
return technical_normalizer.try_parse_datetime_with_formats(text, formats)
def try_parse_review_absolute_datetime(self, text: str) -> datetime | None:
normalized = self.normalize_datetime_connector(text)
parsed = self.try_parse_iso_datetime(normalized)
if parsed is not None:
return parsed
day_first_formats = (
"%d/%m/%Y %H:%M",
"%d/%m/%Y %H:%M:%S",
"%d-%m-%Y %H:%M",
"%d-%m-%Y %H:%M:%S",
)
year_first_formats = (
"%Y/%m/%d %H:%M",
"%Y/%m/%d %H:%M:%S",
"%Y-%m-%d %H:%M",
"%Y-%m-%d %H:%M:%S",
)
return self.try_parse_datetime_with_formats(normalized, day_first_formats + year_first_formats)
return technical_normalizer.try_parse_review_absolute_datetime(text)
def strip_token_edges(self, token: str) -> str:
cleaned = str(token or "").strip()
edge_chars = "[](){}<>,.;:!?\"'`"
while cleaned and cleaned[0] in edge_chars:
cleaned = cleaned[1:]
while cleaned and cleaned[-1] in edge_chars:
cleaned = cleaned[:-1]
return cleaned
return technical_normalizer.strip_token_edges(token)
def extract_hhmm_from_text(self, text: str) -> str | None:
cleaned = self.normalize_datetime_connector(text)
for token in cleaned.split():
normalized_token = self.strip_token_edges(token)
parts = normalized_token.split(":")
if len(parts) not in {2, 3}:
continue
if not all(part.isdigit() for part in parts):
continue
hour = int(parts[0])
minute = int(parts[1])
if 0 <= hour <= 23 and 0 <= minute <= 59:
return f"{hour:02d}:{minute:02d}"
return None
return technical_normalizer.extract_hhmm_from_text(text)
def normalize_review_datetime_text(self, value) -> str | None:
text = str(value or "").strip()
if not text:
return None
absolute_dt = self.try_parse_review_absolute_datetime(text)
if absolute_dt is not None:
return text
normalized = self.normalize_text(text)
day_offset = None
if "amanha" in normalized:
day_offset = 1
elif "hoje" in normalized:
day_offset = 0
if day_offset is None:
return text
time_text = self.extract_hhmm_from_text(normalized)
if not time_text:
return text
hour_text, minute_text = time_text.split(":")
target_date = datetime.now() + timedelta(days=day_offset)
return f"{target_date.strftime('%d/%m/%Y')} {int(hour_text):02d}:{int(minute_text):02d}"
return technical_normalizer.normalize_review_datetime_text(value)
def normalize_generic_fields(self, data) -> dict:
if not isinstance(data, dict):
@ -295,48 +206,19 @@ class EntityNormalizer:
return extracted
def tokenize_text(self, text: str) -> list[str]:
return [token for token in str(text or "").split() if token]
return technical_normalizer.tokenize_text(text)
def clean_protocol_token(self, token: str) -> str:
cleaned = str(token or "").strip().upper()
edge_chars = "[](){}<>,.;:!?\"'`"
while cleaned and cleaned[0] in edge_chars:
cleaned = cleaned[1:]
while cleaned and cleaned[-1] in edge_chars:
cleaned = cleaned[:-1]
return cleaned
return technical_normalizer.clean_protocol_token(token)
def is_valid_protocol_suffix(self, value: str) -> bool:
if not value or len(value) < 4:
return False
return all(char.isalnum() for char in value)
return technical_normalizer.is_valid_protocol_suffix(value)
def normalize_review_protocol(self, value: str) -> str | None:
candidate = self.clean_protocol_token(value)
if not candidate.startswith("REV-"):
return None
parts = candidate.split("-")
if len(parts) != 3:
return None
prefix, date_part, suffix_part = parts
if prefix != "REV":
return None
if len(date_part) != 8 or not date_part.isdigit():
return None
try:
datetime.strptime(date_part, "%Y%m%d")
except ValueError:
return None
if not self.is_valid_protocol_suffix(suffix_part):
return None
return f"{prefix}-{date_part}-{suffix_part}"
return technical_normalizer.normalize_review_protocol(value)
def extract_review_protocol_from_text(self, text: str) -> str | None:
for token in self.tokenize_text(text):
normalized = self.normalize_review_protocol(token)
if normalized:
return normalized
return self.normalize_review_protocol(str(text or ""))
return technical_normalizer.extract_review_protocol_from_text(text)
def normalize_review_management_fields(self, data) -> dict:
if not isinstance(data, dict):
@ -373,8 +255,8 @@ class EntityNormalizer:
if not isinstance(data, dict):
return {}
extracted: dict = {}
order_number = str(data.get("numero_pedido") or "").strip().upper()
if order_number and re.fullmatch(r"PED-[A-Z0-9\\-]+", order_number):
order_number = technical_normalizer.normalize_order_number(data.get("numero_pedido"))
if order_number:
extracted["numero_pedido"] = order_number
reason = str(data.get("motivo") or "").strip(" .;")
if reason:

@ -1,12 +1,16 @@
import logging
import json
from app.services.ai.llm_service import LLMService
from app.services.orchestration.entity_normalizer import EntityNormalizer
from app.services.orchestration.turn_decision import TurnDecision
logger = logging.getLogger(__name__)
# Esse componente pede ao modelo contratos estruturados
# para roteamento, extracao tecnica e decisao por turno.
class MessagePlanner:
def __init__(self, llm: LLMService, normalizer: EntityNormalizer):
self.llm = llm
@ -130,6 +134,46 @@ class MessagePlanner:
logger.exception("Falha ao extrair entidades com LLM. user_id=%s", user_id)
return default
async def extract_turn_decision(self, message: str, user_id: int | None) -> dict:
user_context = f"user_id={user_id}" if user_id is not None else "user_id=anonimo"
default = self.normalizer.empty_turn_decision()
schema_example = json.dumps(TurnDecision().model_dump(), ensure_ascii=True)
prompt = (
"Analise a mensagem do usuario e retorne APENAS JSON valido seguindo o contrato de decisao por turno.\n"
"Nao use markdown. Nao escreva texto fora do JSON. Nao invente dados ausentes.\n"
"Use regex apenas para formatos tecnicos; a decisao semantica deve vir do modelo.\n\n"
"Contrato obrigatorio:\n"
f"{schema_example}\n\n"
"Regras:\n"
"- 'domain' deve ser review, sales ou general.\n"
"- 'intent' deve refletir a intencao principal do turno.\n"
"- 'action' deve ser uma das acoes do contrato.\n"
"- 'entities' deve manter as secoes generic_memory, review_fields, review_management_fields, order_fields e cancel_order_fields.\n"
"- Se faltar dado para continuar um fluxo, use action='ask_missing_fields' e preencha 'missing_fields' e 'response_to_user'.\n"
"- Se o usuario estiver escolhendo entre pedidos enfileirados (ex.: '1', '2', 'o segundo'), preencha 'selection_index' com base zero.\n"
"- Se for necessaria uma tool de orquestracao, use action compativel e preencha 'tool_name' e 'tool_arguments' quando apropriado.\n"
"- Se nao houver acao operacional, use action='answer_user'.\n\n"
f"Contexto: {user_context}\n"
f"Mensagem do usuario: {message}"
)
# Faz um retry curto quando o modelo devolve JSON invalido,
# evitando loops longos e mantendo fallback previsivel.
for attempt in range(2):
try:
result = await self.llm.generate_response(message=prompt, tools=[])
text = (result.get("response") or "").strip()
payload = self.normalizer.parse_json_object(text)
decision = self.normalizer.coerce_turn_decision(payload)
if decision != default:
return decision
if attempt == 0:
logger.warning("Decisao estruturada invalida; repetindo uma vez. user_id=%s", user_id)
except Exception:
logger.exception("Falha ao extrair decisao por turno com LLM. user_id=%s", user_id)
break
return default
def resolve_entities_for_message_plan(self, message_plan: dict, routed_message: str) -> dict:
default = self.normalizer.empty_extraction_payload()
if not isinstance(message_plan, dict):

@ -29,6 +29,8 @@ from app.services.tools.tool_registry import ToolRegistry
logger = logging.getLogger(__name__)
# Coordenador principal do turno conversacional:
# atualiza estado, pede decisoes ao modelo, continua fluxos e executa tools.
class OrquestradorService(ReviewFlowMixin, OrderFlowMixin):
def __init__(
self,
@ -66,13 +68,24 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin):
)
self._upsert_user_context(user_id=user_id)
# Faz uma leitura inicial do turno para ajudar a policy
# com fila, troca de contexto e comandos globais.
early_turn_decision = await self._extract_turn_decision_with_llm(
message=message,
user_id=user_id,
)
pending_order_selection = await self._try_resolve_pending_order_selection(
message=message,
user_id=user_id,
turn_decision=early_turn_decision,
)
if pending_order_selection:
return pending_order_selection
queued_followup = await self._try_continue_queued_order(message=message, user_id=user_id)
queued_followup = await self._try_continue_queued_order(
message=message,
user_id=user_id,
turn_decision=early_turn_decision,
)
if queued_followup:
return queued_followup
@ -106,21 +119,39 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin):
message_plan=message_plan,
routed_message=routing_message,
)
if not self._has_useful_extraction(extracted_entities):
extracted_entities = await self._extract_entities_with_llm(
# Depois do roteamento para um unico pedido, pede a decisao
# estruturada do turno final que sera executado.
turn_decision = await self._extract_turn_decision_with_llm(
message=routing_message,
user_id=user_id,
)
if self._has_useful_turn_decision(turn_decision):
extracted_entities = self._merge_extracted_entities(
extracted_entities,
self._extracted_entities_from_turn_decision(turn_decision),
)
else:
llm_extracted_entities = await self._extract_entities_with_llm(
message=routing_message,
user_id=user_id,
)
extracted_entities = self._merge_extracted_entities(
extracted_entities,
llm_extracted_entities,
)
self._capture_generic_memory(
user_id=user_id,
llm_generic_fields=extracted_entities.get("generic_memory", {}),
)
domain_hint = self._domain_from_intents(extracted_entities.get("intents", {}))
domain_hint = self._domain_from_turn_decision(turn_decision)
if domain_hint == "general":
domain_hint = self._domain_from_intents(extracted_entities.get("intents", {}))
context_switch_response = self._handle_context_switch(
message=routing_message,
user_id=user_id,
target_domain_hint=domain_hint,
turn_decision=turn_decision,
)
if context_switch_response:
return await finish(context_switch_response, queue_notice=queue_notice)
@ -130,6 +161,7 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin):
orchestration_override = await self._try_execute_orchestration_control_tool(
message=routing_message,
user_id=user_id,
turn_decision=turn_decision,
extracted_entities=extracted_entities,
queue_notice=queue_notice,
finish=finish,
@ -137,11 +169,29 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin):
if orchestration_override:
return orchestration_override
decision_action = str(turn_decision.get("action") or "")
decision_response = str(turn_decision.get("response_to_user") or "").strip()
if decision_action == "ask_missing_fields" and decision_response:
return await finish(decision_response, queue_notice=queue_notice)
if decision_action == "answer_user" and decision_response:
return await finish(decision_response, queue_notice=queue_notice)
planned_tool_response = await self._try_execute_business_tool_from_turn_decision(
message=routing_message,
user_id=user_id,
turn_decision=turn_decision,
queue_notice=queue_notice,
finish=finish,
)
if planned_tool_response:
return planned_tool_response
review_management_response = await self._try_handle_review_management(
message=routing_message,
user_id=user_id,
extracted_fields=extracted_entities.get("review_management_fields", {}),
intents=extracted_entities.get("intents", {}),
intents={},
turn_decision=turn_decision,
)
if review_management_response:
return await finish(review_management_response, queue_notice=queue_notice)
@ -161,7 +211,8 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin):
message=routing_message,
user_id=user_id,
extracted_fields=extracted_entities.get("review_fields", {}),
intents=extracted_entities.get("intents", {}),
intents={},
turn_decision=turn_decision,
)
if review_response:
return await finish(review_response, queue_notice=queue_notice)
@ -170,7 +221,8 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin):
message=routing_message,
user_id=user_id,
extracted_fields=extracted_entities.get("cancel_order_fields", {}),
intents=extracted_entities.get("intents", {}),
intents={},
turn_decision=turn_decision,
)
if cancel_order_response:
return await finish(cancel_order_response, queue_notice=queue_notice)
@ -179,7 +231,8 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin):
message=routing_message,
user_id=user_id,
extracted_fields=extracted_entities.get("order_fields", {}),
intents=extracted_entities.get("intents", {}),
intents={},
turn_decision=turn_decision,
)
if order_response:
return await finish(order_response, queue_notice=queue_notice)
@ -221,6 +274,15 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin):
user_id=user_id,
)
return await finish(self._http_exception_detail(exc), queue_notice=queue_notice)
stock_suggestion_response = await self._maybe_build_stock_suggestion_response(
tool_name=tool_name,
arguments=arguments,
tool_result=tool_result,
user_id=user_id,
)
if stock_suggestion_response:
return await finish(stock_suggestion_response, queue_notice=queue_notice)
self._capture_tool_result_context(
tool_name=tool_name,
tool_result=tool_result,
@ -266,10 +328,42 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin):
self,
message: str,
user_id: int | None,
turn_decision: dict | None,
extracted_entities: dict,
queue_notice: str | None,
finish,
) -> str | None:
decision = turn_decision or {}
decision_action = str(decision.get("action") or "").strip()
decision_tool_name = str(decision.get("tool_name") or "").strip()
decision_tool_arguments = decision.get("tool_arguments") if isinstance(decision.get("tool_arguments"), dict) else {}
action_to_tool = {
"clear_context": "limpar_contexto_conversa",
"continue_queue": "continuar_proximo_pedido",
"discard_queue": "descartar_pedidos_pendentes",
"cancel_active_flow": "cancelar_fluxo_atual",
}
planned_tool_name = decision_tool_name or action_to_tool.get(decision_action)
if planned_tool_name in ORCHESTRATION_CONTROL_TOOLS:
if (
planned_tool_name == "cancelar_fluxo_atual"
and self.policy.should_defer_flow_cancellation_control(message=message, user_id=user_id)
):
return None
try:
tool_result = await self.tool_executor.execute(
planned_tool_name,
decision_tool_arguments or {},
user_id=user_id,
)
except HTTPException as exc:
return await finish(self._http_exception_detail(exc), queue_notice=queue_notice)
return await finish(
self._fallback_format_tool_result(planned_tool_name, tool_result),
queue_notice=queue_notice,
)
tools = self.registry.get_tools()
llm_result = await self.llm.generate_response(
message=self._build_router_prompt(user_message=message, user_id=user_id),
@ -338,6 +432,79 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin):
queue_notice=queue_notice,
)
async def _try_execute_business_tool_from_turn_decision(
self,
message: str,
user_id: int | None,
turn_decision: dict | None,
queue_notice: str | None,
finish,
) -> str | None:
decision = turn_decision or {}
if str(decision.get("action") or "").strip() != "call_tool":
return None
tool_name = str(decision.get("tool_name") or "").strip()
if not tool_name or tool_name in ORCHESTRATION_CONTROL_TOOLS:
return None
arguments = decision.get("tool_arguments") if isinstance(decision.get("tool_arguments"), dict) else {}
try:
tool_result = await self.tool_executor.execute(
tool_name,
arguments,
user_id=user_id,
)
except HTTPException as exc:
self._capture_review_confirmation_suggestion(
tool_name=tool_name,
arguments=arguments,
exc=exc,
user_id=user_id,
)
return await finish(self._http_exception_detail(exc), queue_notice=queue_notice)
stock_suggestion_response = await self._maybe_build_stock_suggestion_response(
tool_name=tool_name,
arguments=arguments,
tool_result=tool_result,
user_id=user_id,
)
if stock_suggestion_response:
return await finish(stock_suggestion_response, queue_notice=queue_notice)
self._capture_tool_result_context(
tool_name=tool_name,
tool_result=tool_result,
user_id=user_id,
)
if self._should_use_deterministic_response(tool_name):
return await finish(
self._fallback_format_tool_result(tool_name, tool_result),
queue_notice=queue_notice,
)
final_response = await self.llm.generate_response(
message=self._build_result_prompt(
user_message=message,
user_id=user_id,
tool_name=tool_name,
tool_result=tool_result,
),
tools=[],
)
text = (final_response.get("response") or "").strip()
if self._is_low_value_response(text):
return await finish(
self._fallback_format_tool_result(tool_name, tool_result),
queue_notice=queue_notice,
)
return await finish(
text or self._fallback_format_tool_result(tool_name, tool_result),
queue_notice=queue_notice,
)
def _reset_pending_review_states(self, user_id: int | None) -> None:
if user_id is None:
return
@ -553,6 +720,69 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin):
if sanitized:
context["selected_vehicle"] = None
async def _maybe_build_stock_suggestion_response(
self,
tool_name: str,
arguments: dict | None,
tool_result,
user_id: int | None,
) -> str | None:
if tool_name != "consultar_estoque" or not isinstance(tool_result, list) or tool_result:
return None
budget = self._normalize_positive_number((arguments or {}).get("preco_max"))
if not budget:
return None
relaxed_arguments = dict(arguments or {})
relaxed_arguments["preco_max"] = max(float(budget) * 1.2, float(budget) + 10000.0)
relaxed_arguments["limite"] = min(max(int((arguments or {}).get("limite") or 5), 1), 5)
relaxed_arguments["ordenar_preco"] = "asc"
try:
relaxed_result = await self.tool_executor.execute(
"consultar_estoque",
relaxed_arguments,
user_id=user_id,
)
except HTTPException:
return None
if not isinstance(relaxed_result, list):
return None
nearby = []
for item in relaxed_result:
if not isinstance(item, dict):
continue
try:
price = float(item.get("preco") or 0)
except (TypeError, ValueError):
continue
if price > float(budget):
nearby.append(item)
if not nearby:
return None
self._capture_tool_result_context(
tool_name="consultar_estoque",
tool_result=nearby,
user_id=user_id,
)
budget_label = f"R$ {float(budget):,.0f}".replace(",", ".")
lines = [f"Nao encontrei veiculos ate {budget_label}."]
lines.append("Mas achei algumas opcoes proximas ao seu orcamento:")
for idx, item in enumerate(nearby[:5], start=1):
modelo = str(item.get("modelo") or "N/A")
categoria = str(item.get("categoria") or "N/A")
codigo = item.get("id", "N/A")
preco = f"R$ {float(item.get('preco') or 0):,.2f}".replace(",", "X").replace(".", ",").replace("X", ".")
lines.append(f"{idx}. [{codigo}] {modelo} ({categoria}) - {preco}")
lines.append("Se quiser, responda com o numero da lista ou com o modelo.")
return "\n".join(lines)
def _new_tab_memory(self, user_id: int | None) -> dict:
context = self._get_user_context(user_id)
if not context:
@ -583,12 +813,64 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin):
async def _extract_entities_with_llm(self, message: str, user_id: int | None) -> dict:
return await self.planner.extract_entities(message=message, user_id=user_id)
async def _extract_turn_decision_with_llm(self, message: str, user_id: int | None) -> dict:
return await self.planner.extract_turn_decision(message=message, user_id=user_id)
def _resolve_entities_for_message_plan(self, message_plan: dict, routed_message: str) -> dict:
return self.planner.resolve_entities_for_message_plan(message_plan=message_plan, routed_message=routed_message)
def _has_useful_extraction(self, extraction: dict | None) -> bool:
return self.normalizer.has_useful_extraction(extraction)
def _has_useful_turn_decision(self, turn_decision: dict | None) -> bool:
if not isinstance(turn_decision, dict):
return False
if (turn_decision.get("intent") or "general") != "general":
return True
if (turn_decision.get("action") or "answer_user") != "answer_user":
return True
entities = turn_decision.get("entities")
return self._has_useful_extraction(self._extracted_entities_from_turn_decision(turn_decision)) if isinstance(entities, dict) else False
def _extracted_entities_from_turn_decision(self, turn_decision: dict | None) -> dict:
entities = (turn_decision or {}).get("entities")
if not isinstance(entities, dict):
entities = {}
return {
"generic_memory": entities.get("generic_memory", {}),
"review_fields": entities.get("review_fields", {}),
"review_management_fields": entities.get("review_management_fields", {}),
"order_fields": entities.get("order_fields", {}),
"cancel_order_fields": entities.get("cancel_order_fields", {}),
"intents": {},
}
def _merge_extracted_entities(self, base: dict | None, override: dict | None) -> dict:
merged = self._empty_extraction_payload()
for section in ("generic_memory", "review_fields", "review_management_fields", "order_fields", "cancel_order_fields"):
left = (base or {}).get(section)
right = (override or {}).get(section)
payload = {}
if isinstance(left, dict):
payload.update(left)
if isinstance(right, dict):
payload.update(right)
merged[section] = payload
base_intents = (base or {}).get("intents")
override_intents = (override or {}).get("intents")
if isinstance(base_intents, dict):
merged["intents"].update(base_intents)
if isinstance(override_intents, dict):
merged["intents"].update(override_intents)
return merged
def _domain_from_turn_decision(self, turn_decision: dict | None) -> str:
domain = str((turn_decision or {}).get("domain") or "general").strip().lower()
if domain in {"review", "sales", "general"}:
return domain
return "general"
def _parse_json_object(self, text: str):
return self.normalizer.parse_json_object(text)
@ -722,8 +1004,8 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin):
def _is_order_selection_reset_message(self, message: str) -> bool:
return self.policy.is_order_selection_reset_message(message)
def _looks_like_fresh_operational_request(self, message: str) -> bool:
return self.policy.looks_like_fresh_operational_request(message)
def _looks_like_fresh_operational_request(self, message: str, turn_decision: dict | None = None) -> bool:
return self.policy.looks_like_fresh_operational_request(message, turn_decision=turn_decision)
def _detect_selected_order_index(self, message: str, orders: list[dict]) -> tuple[int | None, bool]:
return self.policy.detect_selected_order_index(message=message, orders=orders)
@ -732,8 +1014,13 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin):
self,
message: str,
user_id: int | None,
turn_decision: dict | None = None,
) -> str | None:
return await self.policy.try_resolve_pending_order_selection(message=message, user_id=user_id)
return await self.policy.try_resolve_pending_order_selection(
message=message,
user_id=user_id,
turn_decision=turn_decision,
)
def _render_queue_notice(self, queued_count: int) -> str | None:
return self.policy.render_queue_notice(queued_count)
@ -750,14 +1037,23 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin):
def _domain_from_intents(self, intents: dict | None) -> str:
return self.policy.domain_from_intents(intents)
def _is_context_switch_confirmation(self, message: str) -> bool:
return self.policy.is_context_switch_confirmation(message)
def _is_context_switch_confirmation(self, message: str, turn_decision: dict | None = None) -> bool:
return self.policy.is_context_switch_confirmation(message, turn_decision=turn_decision)
def _is_continue_queue_message(self, message: str) -> bool:
return self.policy.is_continue_queue_message(message)
def _is_continue_queue_message(self, message: str, turn_decision: dict | None = None) -> bool:
return self.policy.is_continue_queue_message(message, turn_decision=turn_decision)
async def _try_continue_queued_order(self, message: str, user_id: int | None) -> str | None:
return await self.policy.try_continue_queued_order(message=message, user_id=user_id)
async def _try_continue_queued_order(
self,
message: str,
user_id: int | None,
turn_decision: dict | None = None,
) -> str | None:
return await self.policy.try_continue_queued_order(
message=message,
user_id=user_id,
turn_decision=turn_decision,
)
def _has_open_flow(self, user_id: int | None, domain: str) -> bool:
return self.policy.has_open_flow(user_id=user_id, domain=domain)
@ -770,8 +1066,14 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin):
message: str,
user_id: int | None,
target_domain_hint: str = "general",
turn_decision: dict | None = None,
) -> str | None:
return self.policy.handle_context_switch(message=message, user_id=user_id, target_domain_hint=target_domain_hint)
return self.policy.handle_context_switch(
message=message,
user_id=user_id,
target_domain_hint=target_domain_hint,
turn_decision=turn_decision,
)
def _update_active_domain(self, user_id: int | None, domain_hint: str = "general") -> None:
self.policy.update_active_domain(user_id=user_id, domain_hint=domain_hint)

@ -0,0 +1,245 @@
import re
import unicodedata
from datetime import datetime, timedelta
def normalize_text(text: str) -> str:
normalized = unicodedata.normalize("NFKD", text or "")
ascii_text = normalized.encode("ascii", "ignore").decode("ascii")
return ascii_text.lower()
def normalize_plate(value) -> str | None:
text = str(value or "").strip().upper()
if not text:
return None
if re.fullmatch(r"[A-Z]{3}[0-9][A-Z0-9][0-9]{2}", text) or re.fullmatch(r"[A-Z]{3}[0-9]{4}", text):
return text
compact = re.sub(r"[^A-Z0-9]", "", text)
if re.fullmatch(r"[A-Z]{3}[0-9][A-Z0-9][0-9]{2}", compact) or re.fullmatch(r"[A-Z]{3}[0-9]{4}", compact):
return compact
return None
def normalize_cpf(value) -> str | None:
digits = re.sub(r"\D", "", str(value or ""))
if len(digits) == 11:
return digits
return None
def is_valid_cpf(value) -> bool:
digits = normalize_cpf(value)
if not digits:
return False
if digits == digits[0] * 11:
return False
numbers = [int(digit) for digit in digits]
sum_first = sum(number * weight for number, weight in zip(numbers[:9], range(10, 1, -1)))
first_digit = 11 - (sum_first % 11)
first_digit = 0 if first_digit >= 10 else first_digit
if first_digit != numbers[9]:
return False
sum_second = sum(number * weight for number, weight in zip(numbers[:10], range(11, 1, -1)))
second_digit = 11 - (sum_second % 11)
second_digit = 0 if second_digit >= 10 else second_digit
return second_digit == numbers[10]
def normalize_positive_number(value) -> float | None:
if value is None:
return None
if isinstance(value, (int, float)):
number = float(value)
return number if number > 0 else None
text = normalize_text(str(value))
text = text.replace("r$", "").strip()
multiplier = 1000 if "mil" in text else 1
text = text.replace("mil", "").strip()
digits = re.sub(r"[^0-9,.\s]", "", text)
if not digits:
return None
numeric = digits.replace(".", "").replace(" ", "").replace(",", ".")
try:
number = float(numeric) * multiplier
return number if number > 0 else None
except ValueError:
return None
def normalize_vehicle_profile(value) -> list[str]:
if value is None:
return []
allowed = {"suv", "sedan", "hatch", "pickup"}
items = value if isinstance(value, list) else [value]
normalized: list[str] = []
for item in items:
marker = normalize_text(str(item)).strip()
if marker in allowed and marker not in normalized:
normalized.append(marker)
return normalized
def normalize_bool(value) -> bool | None:
if isinstance(value, bool):
return value
lowered = normalize_text(str(value or "")).strip()
if lowered in {"sim", "true", "1", "yes"}:
return True
if lowered in {"nao", "false", "0", "no"}:
return False
return None
def normalize_datetime_connector(text: str) -> str:
compact = " ".join(str(text or "").strip().split())
return re.sub(r"\s+[aàáâã]s\s+", " ", compact, flags=re.IGNORECASE).strip()
def try_parse_iso_datetime(text: str) -> datetime | None:
candidate = str(text or "").strip()
if not candidate:
return None
try:
return datetime.fromisoformat(candidate.replace("Z", "+00:00"))
except ValueError:
return None
def try_parse_datetime_with_formats(text: str, formats: tuple[str, ...]) -> datetime | None:
candidate = str(text or "").strip()
if not candidate:
return None
for fmt in formats:
try:
return datetime.strptime(candidate, fmt)
except ValueError:
continue
return None
def try_parse_review_absolute_datetime(text: str) -> datetime | None:
normalized = normalize_datetime_connector(text)
parsed = try_parse_iso_datetime(normalized)
if parsed is not None:
return parsed
day_first_formats = (
"%d/%m/%Y %H:%M",
"%d/%m/%Y %H:%M:%S",
"%d-%m-%Y %H:%M",
"%d-%m-%Y %H:%M:%S",
)
year_first_formats = (
"%Y/%m/%d %H:%M",
"%Y/%m/%d %H:%M:%S",
"%Y-%m-%d %H:%M",
"%Y-%m-%d %H:%M:%S",
)
return try_parse_datetime_with_formats(normalized, day_first_formats + year_first_formats)
def strip_token_edges(token: str) -> str:
cleaned = str(token or "").strip()
edge_chars = "[](){}<>,.;:!?\"'`"
while cleaned and cleaned[0] in edge_chars:
cleaned = cleaned[1:]
while cleaned and cleaned[-1] in edge_chars:
cleaned = cleaned[:-1]
return cleaned
def extract_hhmm_from_text(text: str) -> str | None:
cleaned = normalize_datetime_connector(text)
for token in cleaned.split():
normalized_token = strip_token_edges(token)
parts = normalized_token.split(":")
if len(parts) not in {2, 3}:
continue
if not all(part.isdigit() for part in parts):
continue
hour = int(parts[0])
minute = int(parts[1])
if 0 <= hour <= 23 and 0 <= minute <= 59:
return f"{hour:02d}:{minute:02d}"
return None
def normalize_review_datetime_text(value, now_provider=None) -> str | None:
text = str(value or "").strip()
if not text:
return None
absolute_dt = try_parse_review_absolute_datetime(text)
if absolute_dt is not None:
return text
normalized = normalize_text(text)
day_offset = None
if "amanha" in normalized:
day_offset = 1
elif "hoje" in normalized:
day_offset = 0
if day_offset is None:
return text
time_text = extract_hhmm_from_text(normalized)
if not time_text:
return text
hour_text, minute_text = time_text.split(":")
current_datetime = now_provider() if callable(now_provider) else datetime.now()
target_date = current_datetime + timedelta(days=day_offset)
return f"{target_date.strftime('%d/%m/%Y')} {int(hour_text):02d}:{int(minute_text):02d}"
def tokenize_text(text: str) -> list[str]:
return [token for token in str(text or "").split() if token]
def clean_protocol_token(token: str) -> str:
return strip_token_edges(str(token or "").strip().upper())
def is_valid_protocol_suffix(value: str) -> bool:
if not value or len(value) < 4:
return False
return all(char.isalnum() for char in value)
def normalize_review_protocol(value: str) -> str | None:
candidate = clean_protocol_token(value)
if not candidate.startswith("REV-"):
return None
parts = candidate.split("-")
if len(parts) != 3:
return None
prefix, date_part, suffix_part = parts
if prefix != "REV":
return None
if len(date_part) != 8 or not date_part.isdigit():
return None
try:
datetime.strptime(date_part, "%Y%m%d")
except ValueError:
return None
if not is_valid_protocol_suffix(suffix_part):
return None
return f"{prefix}-{date_part}-{suffix_part}"
def extract_review_protocol_from_text(text: str) -> str | None:
for token in tokenize_text(text):
normalized = normalize_review_protocol(token)
if normalized:
return normalized
return normalize_review_protocol(str(text or ""))
def normalize_order_number(value) -> str | None:
order_number = str(value or "").strip().upper()
if order_number and re.fullmatch(r"PED-[A-Z0-9\\-]+", order_number):
return order_number
return None

@ -0,0 +1,72 @@
from typing import Any, Literal
from pydantic import BaseModel, ConfigDict, Field, model_validator
# Esse modulo define o contrato estruturado esperado do modelo por turno.
TurnDomain = Literal["review", "sales", "general"]
TurnIntent = Literal[
"review_schedule",
"review_list",
"review_cancel",
"review_reschedule",
"order_create",
"order_cancel",
"inventory_search",
"conversation_reset",
"queue_continue",
"discard_queue",
"cancel_active_flow",
"general",
]
TurnAction = Literal[
"collect_review_schedule",
"collect_review_management",
"collect_order_create",
"collect_order_cancel",
"ask_missing_fields",
"answer_user",
"call_tool",
"clear_context",
"continue_queue",
"discard_queue",
"cancel_active_flow",
]
class DecisionEntities(BaseModel):
model_config = ConfigDict(extra="forbid")
# As entidades continuam separadas por tipo de fluxo para facilitar
# compatibilidade com os mixins e validadores tecnicos atuais.
generic_memory: dict[str, Any] = Field(default_factory=dict)
review_fields: dict[str, Any] = Field(default_factory=dict)
review_management_fields: dict[str, Any] = Field(default_factory=dict)
order_fields: dict[str, Any] = Field(default_factory=dict)
cancel_order_fields: dict[str, Any] = Field(default_factory=dict)
class TurnDecision(BaseModel):
model_config = ConfigDict(extra="forbid")
# O modelo decide a intencao, o dominio e a acao do turno.
intent: TurnIntent = "general"
domain: TurnDomain = "general"
action: TurnAction = "answer_user"
entities: DecisionEntities = Field(default_factory=DecisionEntities)
missing_fields: list[str] = Field(default_factory=list)
selection_index: int | None = None
tool_name: str | None = None
tool_arguments: dict[str, Any] = Field(default_factory=dict)
response_to_user: str | None = None
@model_validator(mode="after")
def validate_contract(self):
if self.action == "ask_missing_fields":
if not self.missing_fields or not str(self.response_to_user or "").strip():
raise ValueError("ask_missing_fields exige missing_fields e response_to_user")
if self.action == "call_tool" and not str(self.tool_name or "").strip():
raise ValueError("call_tool exige tool_name")
if self.selection_index is not None and self.selection_index < 0:
raise ValueError("selection_index deve ser maior ou igual a zero")
return self

@ -0,0 +1,452 @@
import os
import unittest
os.environ.setdefault("DEBUG", "false")
from datetime import datetime, timedelta
from app.services.orchestration.conversation_policy import ConversationPolicy
from app.services.orchestration.entity_normalizer import EntityNormalizer
from app.services.orchestration.message_planner import MessagePlanner
from app.services.orchestration.orquestrador_service import OrquestradorService
class FakeLLM:
def __init__(self, responses):
self.responses = list(responses)
self.calls = 0
async def generate_response(self, message: str, tools):
self.calls += 1
if self.responses:
return self.responses.pop(0)
return {"response": "", "tool_call": None}
class FakeState:
def __init__(self, entries=None, contexts=None):
self.entries = entries or {}
self.contexts = contexts or {}
def get_entry(self, bucket: str, user_id: int | None, *, expire: bool = False):
if user_id is None:
return None
return self.entries.get(bucket, {}).get(user_id)
def set_entry(self, bucket: str, user_id: int | None, value: dict):
if user_id is None:
return
self.entries.setdefault(bucket, {})[user_id] = value
def pop_entry(self, bucket: str, user_id: int | None):
if user_id is None:
return None
return self.entries.get(bucket, {}).pop(user_id, None)
class FakeToolExecutor:
def __init__(self, result=None):
self.result = result or {"ok": True}
self.calls = []
async def execute(self, tool_name: str, arguments: dict, user_id: int | None = None):
self.calls.append((tool_name, arguments, user_id))
if tool_name == "consultar_estoque" and arguments.get("preco_max") and float(arguments["preco_max"]) > 50000:
return [
{"id": 7, "modelo": "Hyundai HB20 2022", "categoria": "hatch", "preco": 54500.0},
{"id": 8, "modelo": "Chevrolet Onix 2023", "categoria": "hatch", "preco": 58900.0},
]
return self.result
class FakePolicyService:
def __init__(self, state):
self.state = state
self.normalizer = EntityNormalizer()
def _get_user_context(self, user_id: int | None):
if user_id is None:
return None
return self.state.contexts.get(user_id)
def _new_tab_memory(self, user_id: int | None):
return {}
def _is_affirmative_message(self, text: str) -> bool:
normalized = self.normalizer.normalize_text(text).strip().rstrip(".!?,;:")
return normalized in {"sim", "pode", "ok", "confirmo", "aceito", "fechado", "pode sim"}
def _is_negative_message(self, text: str) -> bool:
normalized = self.normalizer.normalize_text(text).strip().rstrip(".!?,;:")
return normalized in {"nao", "nao quero"} or normalized.startswith("nao")
def _clear_user_conversation_state(self, user_id: int | None) -> None:
context = self._get_user_context(user_id)
if context:
context["pending_order_selection"] = None
async def handle_message(self, message: str, user_id: int | None = None) -> str:
return f"handled:{message}"
def _render_missing_review_fields_prompt(self, missing_fields: list[str]) -> str:
return "missing review"
def _render_missing_review_reschedule_fields_prompt(self, missing_fields: list[str]) -> str:
return "missing review reschedule"
def _render_missing_review_cancel_fields_prompt(self, missing_fields: list[str]) -> str:
return "missing review cancel"
def _render_review_reuse_question(self) -> str:
return "reuse review?"
def _render_missing_order_fields_prompt(self, missing_fields: list[str]) -> str:
return "missing order"
def _render_missing_cancel_order_fields_prompt(self, missing_fields: list[str]) -> str:
return "missing cancel order"
class TurnDecisionContractTests(unittest.IsolatedAsyncioTestCase):
async def test_extract_turn_decision_retries_once_and_returns_structured_payload(self):
llm = FakeLLM(
[
{"response": "nao eh json", "tool_call": None},
{
"response": """
{
"intent": "review_schedule",
"domain": "review",
"action": "ask_missing_fields",
"entities": {
"generic_memory": {},
"review_fields": {"placa": "abc1234", "data_hora": "10/03/2026 às 09:00"},
"review_management_fields": {},
"order_fields": {},
"cancel_order_fields": {}
},
"missing_fields": ["modelo", "ano", "km"],
"tool_name": null,
"tool_arguments": {},
"response_to_user": "Preciso do modelo, ano e quilometragem."
}
""",
"tool_call": None,
},
]
)
planner = MessagePlanner(llm=llm, normalizer=EntityNormalizer())
decision = await planner.extract_turn_decision("Quero agendar revisão amanhã às 09:00", user_id=7)
self.assertEqual(llm.calls, 2)
self.assertEqual(decision["intent"], "review_schedule")
self.assertEqual(decision["domain"], "review")
self.assertEqual(decision["action"], "ask_missing_fields")
self.assertEqual(decision["entities"]["review_fields"]["placa"], "ABC1234")
self.assertEqual(decision["entities"]["review_fields"]["data_hora"], "10/03/2026 às 09:00")
self.assertEqual(decision["missing_fields"], ["modelo", "ano", "km"])
def test_coerce_turn_decision_rejects_invalid_shape_with_fallback(self):
normalizer = EntityNormalizer()
decision = normalizer.coerce_turn_decision(
{
"intent": "valor_invalido",
"domain": "sales",
"action": "call_tool",
"entities": [],
}
)
self.assertEqual(decision["intent"], "general")
self.assertEqual(decision["domain"], "general")
self.assertEqual(decision["action"], "answer_user")
self.assertEqual(decision["entities"]["order_fields"], {})
def test_coerce_turn_decision_rejects_missing_fields_without_response_payload(self):
normalizer = EntityNormalizer()
decision = normalizer.coerce_turn_decision(
{
"intent": "review_schedule",
"domain": "review",
"action": "ask_missing_fields",
"entities": {
"generic_memory": {},
"review_fields": {},
"review_management_fields": {},
"order_fields": {},
"cancel_order_fields": {},
},
"missing_fields": [],
"tool_name": None,
"tool_arguments": {},
"response_to_user": "",
}
)
self.assertEqual(decision["intent"], "general")
self.assertEqual(decision["action"], "answer_user")
def test_turn_decision_entities_do_not_rebuild_legacy_intents(self):
service = OrquestradorService.__new__(OrquestradorService)
service.normalizer = EntityNormalizer()
extracted = service._extracted_entities_from_turn_decision(
{
"intent": "order_create",
"domain": "sales",
"action": "collect_order_create",
"entities": {
"generic_memory": {"cpf": "12345678909"},
"review_fields": {},
"review_management_fields": {},
"order_fields": {"vehicle_id": 1},
"cancel_order_fields": {},
},
}
)
self.assertEqual(extracted["intents"], {})
self.assertEqual(extracted["order_fields"]["vehicle_id"], 1)
def test_turn_decision_entity_merge_preserves_generic_memory_from_previous_extraction(self):
service = OrquestradorService.__new__(OrquestradorService)
service.normalizer = EntityNormalizer()
service._empty_extraction_payload = service.normalizer.empty_extraction_payload
merged = service._merge_extracted_entities(
{
"generic_memory": {"orcamento_max": 70000},
"review_fields": {},
"review_management_fields": {},
"order_fields": {"cpf": "12345678909"},
"cancel_order_fields": {},
"intents": {},
},
{
"generic_memory": {},
"review_fields": {},
"review_management_fields": {},
"order_fields": {},
"cancel_order_fields": {},
"intents": {},
},
)
self.assertEqual(merged["generic_memory"]["orcamento_max"], 70000)
self.assertEqual(merged["order_fields"]["cpf"], "12345678909")
def test_entity_merge_can_enrich_message_plan_with_full_extraction(self):
service = OrquestradorService.__new__(OrquestradorService)
service.normalizer = EntityNormalizer()
service._empty_extraction_payload = service.normalizer.empty_extraction_payload
merged = service._merge_extracted_entities(
{
"generic_memory": {},
"review_fields": {},
"review_management_fields": {},
"order_fields": {"cpf": "12345678909"},
"cancel_order_fields": {},
"intents": {},
},
{
"generic_memory": {"orcamento_max": 70000},
"review_fields": {},
"review_management_fields": {},
"order_fields": {},
"cancel_order_fields": {},
"intents": {},
},
)
self.assertEqual(merged["generic_memory"]["orcamento_max"], 70000)
self.assertEqual(merged["order_fields"]["cpf"], "12345678909")
async def test_turn_decision_call_tool_executes_without_router(self):
service = OrquestradorService.__new__(OrquestradorService)
service.tool_executor = FakeToolExecutor(result={"numero_pedido": "PED-1", "status": "Ativo"})
service.llm = FakeLLM([])
service._capture_review_confirmation_suggestion = lambda **kwargs: None
service._capture_tool_result_context = lambda **kwargs: None
service._should_use_deterministic_response = lambda tool_name: True
service._fallback_format_tool_result = lambda tool_name, tool_result: f"{tool_name}:{tool_result['numero_pedido']}"
service._build_result_prompt = lambda **kwargs: "unused"
service._http_exception_detail = lambda exc: str(exc)
service._is_low_value_response = lambda text: False
async def finish(response: str, queue_notice: str | None = None) -> str:
return response if not queue_notice else f"{queue_notice}\n{response}"
response = await service._try_execute_business_tool_from_turn_decision(
message="quero fechar o pedido",
user_id=7,
turn_decision={
"action": "call_tool",
"tool_name": "realizar_pedido",
"tool_arguments": {"cpf": "12345678909", "vehicle_id": 1},
},
queue_notice=None,
finish=finish,
)
self.assertEqual(
service.tool_executor.calls,
[("realizar_pedido", {"cpf": "12345678909", "vehicle_id": 1}, 7)],
)
self.assertEqual(response, "realizar_pedido:PED-1")
self.assertEqual(service.llm.calls, 0)
async def test_empty_stock_search_suggests_nearby_options(self):
service = OrquestradorService.__new__(OrquestradorService)
service.normalizer = EntityNormalizer()
service.tool_executor = FakeToolExecutor(result=[])
service._get_user_context = lambda user_id: {
"generic_memory": {},
"shared_memory": {},
"last_stock_results": [],
"selected_vehicle": None,
}
service._capture_tool_result_context = lambda tool_name, tool_result, user_id: None
service._normalize_positive_number = service.normalizer.normalize_positive_number
response = await service._maybe_build_stock_suggestion_response(
tool_name="consultar_estoque",
arguments={"preco_max": 50000, "limite": 5},
tool_result=[],
user_id=5,
)
self.assertIn("Nao encontrei veiculos ate R$ 50.000.", response)
self.assertIn("Hyundai HB20 2022", response)
self.assertIn("Se quiser, responda com o numero da lista ou com o modelo.", response)
async def test_turn_decision_answer_user_can_short_circuit_router(self):
decision = {
"intent": "general",
"domain": "general",
"action": "answer_user",
"response_to_user": "Resposta direta do contrato.",
}
self.assertEqual(str(decision.get("action") or ""), "answer_user")
self.assertEqual(str(decision.get("response_to_user") or "").strip(), "Resposta direta do contrato.")
async def test_pending_order_selection_prefers_turn_decision_domain(self):
state = FakeState(
contexts={
9: {
"pending_order_selection": {
"orders": [
{"domain": "review", "message": "agendar revisao", "memory_seed": {}},
{"domain": "sales", "message": "fazer pedido", "memory_seed": {}},
],
"expires_at": datetime.utcnow() + timedelta(minutes=15),
},
"order_queue": [],
"active_domain": "general",
"generic_memory": {},
}
}
)
policy = ConversationPolicy(service=FakePolicyService(state))
response = await policy.try_resolve_pending_order_selection(
message="quero comprar",
user_id=9,
turn_decision={"domain": "sales", "intent": "order_create", "action": "collect_order_create"},
)
self.assertIn("Vou comecar por: Venda: fazer pedido", response)
async def test_pending_order_selection_prefers_turn_decision_selection_index(self):
state = FakeState(
contexts={
9: {
"pending_order_selection": {
"orders": [
{"domain": "review", "message": "agendar revisao", "memory_seed": {}},
{"domain": "sales", "message": "fazer pedido", "memory_seed": {}},
],
"expires_at": datetime.utcnow() + timedelta(minutes=15),
},
"order_queue": [],
"active_domain": "general",
"generic_memory": {},
}
}
)
policy = ConversationPolicy(service=FakePolicyService(state))
response = await policy.try_resolve_pending_order_selection(
message="esse",
user_id=9,
turn_decision={"domain": "general", "intent": "general", "action": "answer_user", "selection_index": 1},
)
self.assertIn("Vou comecar por: Venda: fazer pedido", response)
async def test_try_continue_queue_prefers_turn_decision_action(self):
state = FakeState(
contexts={
9: {
"pending_switch": {
"target_domain": "sales",
"queued_message": "fazer pedido",
"memory_seed": {"cpf": "12345678909"},
"expires_at": datetime.utcnow() + timedelta(minutes=15),
},
"active_domain": "general",
"generic_memory": {},
"pending_order_selection": None,
}
}
)
service = FakePolicyService(state)
policy = ConversationPolicy(service=service)
policy.apply_domain_switch = lambda user_id, target_domain: service._get_user_context(user_id).update(
{"active_domain": target_domain, "pending_switch": None}
)
response = await policy.try_continue_queued_order(
message="ok",
user_id=9,
turn_decision={"action": "continue_queue", "intent": "queue_continue", "domain": "sales"},
)
self.assertIn("Agora, sobre a compra do veiculo:", response)
def test_handle_context_switch_prefers_turn_decision_domain_confirmation(self):
state = FakeState(
contexts={
9: {
"pending_switch": {
"target_domain": "review",
"expires_at": datetime.utcnow() + timedelta(minutes=15),
},
"active_domain": "sales",
"generic_memory": {},
"pending_order_selection": None,
}
}
)
service = FakePolicyService(state)
policy = ConversationPolicy(service=service)
policy.apply_domain_switch = lambda user_id, target_domain: service._get_user_context(user_id).update(
{"active_domain": target_domain, "pending_switch": None}
)
response = policy.handle_context_switch(
message="quero revisar",
user_id=9,
target_domain_hint="review",
turn_decision={"domain": "review", "intent": "review_schedule", "action": "collect_review_schedule"},
)
self.assertEqual(response, "Certo, contexto anterior encerrado. Vamos seguir com agendamento de revisao.")
if __name__ == "__main__":
unittest.main()
Loading…
Cancel
Save