🐛 fix(orchestration): blindar continuidade de contexto e fluxos estruturados de revisao e compra

- adiciona snapshots de fluxo, collected_slots e last_tool_result ao estado conversacional, incluindo persistencia no backend em memoria e no Redis\n- endurece o reaproveitamento de revisao, o reset imediato de contexto e a retomada incremental sem perder drafts apos respostas fracas do modelo\n- prioriza follow-ups operacionais de vendas antes do LLM para selecao de estoque, CPF, nova busca e continuidade de pedido/cancelamento\n- normaliza aliases de tools de compra, persiste selecoes pendentes de estoque e preserva sugestoes com budget_relaxed entre turnos\n- melhora a policy de troca de contexto para confirmacoes ambiguas, novos pedidos operacionais e onboarding orientado apos mudanca de dominio\n- amplia a cobertura de regressao para revisao, compra, cancelamento, reset global e execucao estruturada do orquestrador
main
parent 3cf5bf863a
commit af30a81cef

@ -11,6 +11,7 @@ from app.services.orchestration.orchestrator_config import (
ORDER_REQUIRED_FIELDS,
PENDING_CANCEL_ORDER_DRAFT_TTL_MINUTES,
PENDING_ORDER_DRAFT_TTL_MINUTES,
PENDING_ORDER_SELECTION_TTL_MINUTES,
)
from app.services.user.mock_customer_service import hydrate_mock_customer_from_cpf
@ -18,6 +19,127 @@ from app.services.user.mock_customer_service import hydrate_mock_customer_from_c
# Esse mixin cuida dos fluxos de venda:
# criacao de pedido, selecao de veiculo e cancelamento.
class OrderFlowMixin:
def _sanitize_stock_results(self, stock_results: list[dict] | None) -> list[dict]:
sanitized: list[dict] = []
for item in stock_results or []:
if not isinstance(item, dict):
continue
try:
vehicle_id = int(item.get("id"))
preco = float(item.get("preco") or 0)
except (TypeError, ValueError):
continue
sanitized.append(
{
"id": vehicle_id,
"modelo": str(item.get("modelo") or "").strip(),
"categoria": str(item.get("categoria") or "").strip(),
"preco": preco,
"budget_relaxed": bool(item.get("budget_relaxed", False)),
}
)
return sanitized
def _get_order_flow_snapshot(self, user_id: int | None, snapshot_key: str) -> dict | None:
if user_id is None or not hasattr(self, "_get_user_context"):
return None
context = self._get_user_context(user_id)
if not isinstance(context, dict):
return None
snapshots = context.get("flow_snapshots")
if not isinstance(snapshots, dict):
return None
snapshot = snapshots.get(snapshot_key)
return dict(snapshot) if isinstance(snapshot, dict) else None
def _set_order_flow_snapshot(
self,
user_id: int | None,
snapshot_key: str,
value: dict | None,
*,
active_task: str | None = None,
) -> None:
if user_id is None or not hasattr(self, "_get_user_context") or not hasattr(self, "_save_user_context"):
return
context = self._get_user_context(user_id)
if not isinstance(context, dict):
return
snapshots = context.get("flow_snapshots")
if not isinstance(snapshots, dict):
snapshots = {}
context["flow_snapshots"] = snapshots
if isinstance(value, dict):
snapshots[snapshot_key] = value
if active_task:
context["active_task"] = active_task
collected_slots = context.get("collected_slots")
if not isinstance(collected_slots, dict):
collected_slots = {}
context["collected_slots"] = collected_slots
payload = value.get("payload")
if isinstance(payload, dict):
collected_slots[active_task] = dict(payload)
else:
snapshots.pop(snapshot_key, None)
if active_task and context.get("active_task") == active_task:
context["active_task"] = None
collected_slots = context.get("collected_slots")
if isinstance(collected_slots, dict) and active_task:
collected_slots.pop(active_task, None)
self._save_user_context(user_id=user_id, context=context)
def _get_order_flow_entry(self, bucket: str, user_id: int | None, snapshot_key: str) -> dict | None:
entry = self.state.get_entry(bucket, user_id, expire=True)
if entry:
return entry
snapshot = self._get_order_flow_snapshot(user_id=user_id, snapshot_key=snapshot_key)
if not snapshot:
return None
if snapshot.get("expires_at") and snapshot["expires_at"] < datetime.utcnow():
self._set_order_flow_snapshot(user_id=user_id, snapshot_key=snapshot_key, value=None)
return None
self.state.set_entry(bucket, user_id, snapshot)
return snapshot
def _set_order_flow_entry(
self,
bucket: str,
user_id: int | None,
snapshot_key: str,
value: dict,
*,
active_task: str | None = None,
) -> None:
self.state.set_entry(bucket, user_id, value)
self._set_order_flow_snapshot(
user_id=user_id,
snapshot_key=snapshot_key,
value=value,
active_task=active_task,
)
def _pop_order_flow_entry(
self,
bucket: str,
user_id: int | None,
snapshot_key: str,
*,
active_task: str | None = None,
) -> dict | None:
entry = self.state.pop_entry(bucket, user_id)
self._set_order_flow_snapshot(
user_id=user_id,
snapshot_key=snapshot_key,
value=None,
active_task=active_task,
)
return entry
def _decision_intent(self, turn_decision: dict | None) -> str:
return str((turn_decision or {}).get("intent") or "").strip().lower()
@ -108,12 +230,11 @@ class OrderFlowMixin:
if not isinstance(generic_memory, dict):
generic_memory = {}
context["generic_memory"] = generic_memory
if generic_memory.get("orcamento_max"):
return
budget = extract_budget_from_text(message)
if budget:
generic_memory["orcamento_max"] = int(round(budget))
context.setdefault("shared_memory", {})["orcamento_max"] = int(round(budget))
normalized_budget = int(round(budget))
generic_memory["orcamento_max"] = normalized_budget
context.setdefault("shared_memory", {})["orcamento_max"] = normalized_budget
self._save_user_context(user_id=user_id, context=context)
def _try_prefill_order_cpf_from_memory(self, user_id: int | None, payload: dict) -> None:
@ -141,11 +262,34 @@ class OrderFlowMixin:
db.close()
def _get_last_stock_results(self, user_id: int | None) -> list[dict]:
pending_selection = self.state.get_entry("pending_stock_selections", user_id, expire=True)
if isinstance(pending_selection, dict):
payload = pending_selection.get("payload")
if isinstance(payload, list):
sanitized = self._sanitize_stock_results(payload)
if sanitized:
return sanitized
context = self._get_user_context(user_id)
if not context:
return []
stock_results = context.get("last_stock_results") or []
return stock_results if isinstance(stock_results, list) else []
return self._sanitize_stock_results(stock_results if isinstance(stock_results, list) else [])
def _store_pending_stock_selection(self, user_id: int | None, stock_results: list[dict] | None) -> None:
if user_id is None:
return
sanitized = self._sanitize_stock_results(stock_results)
if not sanitized:
self.state.pop_entry("pending_stock_selections", user_id)
return
self.state.set_entry(
"pending_stock_selections",
user_id,
{
"payload": sanitized,
"expires_at": datetime.utcnow() + timedelta(minutes=PENDING_ORDER_SELECTION_TTL_MINUTES),
},
)
def _get_selected_vehicle(self, user_id: int | None) -> dict | None:
context = self._get_user_context(user_id)
@ -165,24 +309,9 @@ class OrderFlowMixin:
context = self._get_user_context(user_id)
if not context:
return
sanitized: list[dict] = []
for item in stock_results or []:
if not isinstance(item, dict):
continue
try:
vehicle_id = int(item.get("id"))
preco = float(item.get("preco") or 0)
except (TypeError, ValueError):
continue
sanitized.append(
{
"id": vehicle_id,
"modelo": str(item.get("modelo") or "").strip(),
"categoria": str(item.get("categoria") or "").strip(),
"preco": preco,
}
)
sanitized = self._sanitize_stock_results(stock_results)
context["last_stock_results"] = sanitized
self._store_pending_stock_selection(user_id=user_id, stock_results=sanitized)
if sanitized:
context["selected_vehicle"] = None
context["pending_single_vehicle_confirmation"] = None
@ -196,6 +325,7 @@ class OrderFlowMixin:
return
context["selected_vehicle"] = dict(vehicle) if isinstance(vehicle, dict) else None
context["pending_single_vehicle_confirmation"] = None
self.state.pop_entry("pending_stock_selections", user_id)
self._save_user_context(user_id=user_id, context=context)
def _store_pending_single_vehicle_confirmation(self, user_id: int | None, vehicle: dict | None) -> None:
@ -268,13 +398,15 @@ class OrderFlowMixin:
if isinstance(budget, (int, float)) and float(budget) > 0:
if isinstance(selected_vehicle, dict):
try:
if float(selected_vehicle.get("preco") or 0) > float(budget):
if not bool(selected_vehicle.get("budget_relaxed")) and float(selected_vehicle.get("preco") or 0) > float(budget):
return True
except (TypeError, ValueError):
return True
for item in last_stock_results:
if not isinstance(item, dict):
continue
if bool(item.get("budget_relaxed")):
continue
try:
if float(item.get("preco") or 0) > float(budget):
return True
@ -312,6 +444,7 @@ class OrderFlowMixin:
context["last_stock_results"] = []
context["selected_vehicle"] = None
context["pending_single_vehicle_confirmation"] = None
self.state.pop_entry("pending_stock_selections", user_id)
self._save_user_context(user_id=user_id, context=context)
def _match_vehicle_from_message_index(self, message: str, stock_results: list[dict]) -> dict | None:
@ -377,6 +510,83 @@ class OrderFlowMixin:
return None
def _should_bootstrap_order_from_context(
self,
message: str,
user_id: int | None,
payload: dict | None = None,
) -> bool:
if user_id is None:
return False
pending_single_vehicle = self._get_pending_single_vehicle_confirmation(user_id=user_id)
if pending_single_vehicle and (
self._message_confirms_single_vehicle(message=message, vehicle=pending_single_vehicle)
or self._is_negative_message(message)
):
return True
stock_results = self._get_last_stock_results(user_id=user_id)
if not stock_results:
return False
normalized_payload = payload if isinstance(payload, dict) else {}
return bool(
self._match_vehicle_from_message_model(message=message, stock_results=stock_results)
or self._match_vehicle_from_message_index(message=message, stock_results=stock_results)
or (
normalized_payload.get("modelo_veiculo")
and self._try_resolve_order_vehicle(message=message, user_id=user_id, payload=normalized_payload)
)
)
def _should_restart_open_order_draft(
self,
message: str,
user_id: int | None,
turn_decision: dict | None = None,
) -> bool:
if user_id is None:
return False
if self.normalizer.normalize_cpf(message):
return False
if self._should_bootstrap_order_from_context(message=message, user_id=user_id, payload={}):
return False
current_draft = self.state.get_entry("pending_order_drafts", user_id, expire=True)
draft_payload = current_draft.get("payload", {}) if isinstance(current_draft, dict) else {}
selected_vehicle = self._get_selected_vehicle(user_id=user_id)
if self._has_stock_listing_request(message=message, turn_decision=turn_decision):
return bool(
(isinstance(draft_payload, dict) and draft_payload.get("vehicle_id"))
or selected_vehicle
)
if not self._has_explicit_order_request(message):
return False
normalized = self._normalize_text(message).strip()
if extract_budget_from_text(message) is not None:
return True
restart_terms = {
"agora quero comprar",
"quero comprar outro",
"outro carro",
"outro veiculo",
"nova busca",
"novo pedido",
"faixa de preco",
"faixa de valor",
"ate ",
"modelo ",
"tipo de carro",
"suv",
"sedan",
"hatch",
"pickup",
"picape",
}
return any(term in normalized for term in restart_terms)
def _render_missing_order_fields_prompt(self, missing_fields: list[str]) -> str:
labels = {
"cpf": "o CPF do cliente",
@ -500,12 +710,30 @@ class OrderFlowMixin:
return None
normalized_intents = self._normalize_intents(intents)
draft = self.state.get_entry("pending_order_drafts", user_id, expire=True)
draft = self._get_order_flow_entry("pending_order_drafts", user_id, "order_create")
extracted = self._normalize_order_fields(extracted_fields)
decision_intent = self._decision_intent(turn_decision)
has_intent = decision_intent == "order_create" or normalized_intents.get("order_create", False)
explicit_order_request = self._has_explicit_order_request(message)
if draft and self._should_restart_open_order_draft(
message=message,
user_id=user_id,
turn_decision=turn_decision,
):
self._pop_order_flow_entry(
"pending_order_drafts",
user_id,
"order_create",
active_task="order_create",
)
self._reset_order_stock_context(user_id=user_id)
draft = None
should_bootstrap_from_context = draft is None and self._should_bootstrap_order_from_context(
message=message,
user_id=user_id,
payload=extracted,
)
if (
draft
@ -526,10 +754,15 @@ class OrderFlowMixin:
)
and not extracted
):
self.state.pop_entry("pending_order_drafts", user_id)
self._pop_order_flow_entry(
"pending_order_drafts",
user_id,
"order_create",
active_task="order_create",
)
return None
if draft is None and not has_intent and not explicit_order_request:
if draft is None and not has_intent and not explicit_order_request and not should_bootstrap_from_context:
return None
if draft is None:
@ -559,11 +792,23 @@ class OrderFlowMixin:
elif self._is_negative_message(message):
self._clear_pending_single_vehicle_confirmation(user_id=user_id)
draft["expires_at"] = datetime.utcnow() + timedelta(minutes=PENDING_ORDER_DRAFT_TTL_MINUTES)
self.state.set_entry("pending_order_drafts", user_id, draft)
self._set_order_flow_entry(
"pending_order_drafts",
user_id,
"order_create",
draft,
active_task="order_create",
)
return "Sem problema. Me diga outro modelo ou ajuste o valor para eu buscar novas opcoes."
elif not self._has_explicit_order_request(message):
draft["expires_at"] = datetime.utcnow() + timedelta(minutes=PENDING_ORDER_DRAFT_TTL_MINUTES)
self.state.set_entry("pending_order_drafts", user_id, draft)
self._set_order_flow_entry(
"pending_order_drafts",
user_id,
"order_create",
draft,
active_task="order_create",
)
return self._render_single_vehicle_confirmation_prompt(pending_single_vehicle)
resolved_vehicle = self._try_resolve_order_vehicle(
@ -580,7 +825,13 @@ class OrderFlowMixin:
cpf_value = draft["payload"].get("cpf")
if cpf_value and not self._is_valid_cpf(str(cpf_value)):
draft["payload"].pop("cpf", None)
self.state.set_entry("pending_order_drafts", user_id, draft)
self._set_order_flow_entry(
"pending_order_drafts",
user_id,
"order_create",
draft,
active_task="order_create",
)
return "Para seguir com o pedido, preciso de um CPF valido. Pode me informar novamente?"
if cpf_value:
try:
@ -590,11 +841,23 @@ class OrderFlowMixin:
)
except ValueError:
draft["payload"].pop("cpf", None)
self.state.set_entry("pending_order_drafts", user_id, draft)
self._set_order_flow_entry(
"pending_order_drafts",
user_id,
"order_create",
draft,
active_task="order_create",
)
return "Para seguir com o pedido, preciso de um CPF valido. Pode me informar novamente?"
draft["expires_at"] = datetime.utcnow() + timedelta(minutes=PENDING_ORDER_DRAFT_TTL_MINUTES)
self.state.set_entry("pending_order_drafts", user_id, draft)
self._set_order_flow_entry(
"pending_order_drafts",
user_id,
"order_create",
draft,
active_task="order_create",
)
missing = [field for field in ORDER_REQUIRED_FIELDS if field not in draft["payload"]]
if missing:
@ -627,9 +890,21 @@ class OrderFlowMixin:
if error.get("retryable") and error.get("field"):
draft["payload"].pop(str(error["field"]), None)
draft["expires_at"] = datetime.utcnow() + timedelta(minutes=PENDING_ORDER_DRAFT_TTL_MINUTES)
self.state.set_entry("pending_order_drafts", user_id, draft)
self._set_order_flow_entry(
"pending_order_drafts",
user_id,
"order_create",
draft,
active_task="order_create",
)
return self._http_exception_detail(exc)
self.state.pop_entry("pending_order_drafts", user_id)
self._pop_order_flow_entry(
"pending_order_drafts",
user_id,
"order_create",
active_task="order_create",
)
self._reset_order_stock_context(user_id=user_id)
return self._fallback_format_tool_result("realizar_pedido", tool_result)
@ -645,8 +920,8 @@ class OrderFlowMixin:
return None
normalized_intents = self._normalize_intents(intents)
draft = self.state.get_entry("pending_cancel_order_drafts", user_id, expire=True)
active_order_draft = self.state.get_entry("pending_order_drafts", user_id, expire=True)
draft = self._get_order_flow_entry("pending_cancel_order_drafts", user_id, "order_cancel")
active_order_draft = self._get_order_flow_entry("pending_order_drafts", user_id, "order_create")
extracted = self._normalize_cancel_order_fields(extracted_fields)
decision_intent = self._decision_intent(turn_decision)
@ -683,7 +958,12 @@ class OrderFlowMixin:
)
and not extracted
):
self.state.pop_entry("pending_cancel_order_drafts", user_id)
self._pop_order_flow_entry(
"pending_cancel_order_drafts",
user_id,
"order_cancel",
active_task="order_cancel",
)
return None
if not has_intent and draft is None:
@ -708,7 +988,13 @@ class OrderFlowMixin:
draft["payload"].update(extracted)
draft["expires_at"] = datetime.utcnow() + timedelta(minutes=PENDING_CANCEL_ORDER_DRAFT_TTL_MINUTES)
self.state.set_entry("pending_cancel_order_drafts", user_id, draft)
self._set_order_flow_entry(
"pending_cancel_order_drafts",
user_id,
"order_cancel",
draft,
active_task="order_cancel",
)
missing = [field for field in CANCEL_ORDER_REQUIRED_FIELDS if field not in draft["payload"]]
if missing:
@ -725,8 +1011,19 @@ class OrderFlowMixin:
if error.get("retryable") and error.get("field"):
draft["payload"].pop(str(error["field"]), None)
draft["expires_at"] = datetime.utcnow() + timedelta(minutes=PENDING_CANCEL_ORDER_DRAFT_TTL_MINUTES)
self.state.set_entry("pending_cancel_order_drafts", user_id, draft)
self._set_order_flow_entry(
"pending_cancel_order_drafts",
user_id,
"order_cancel",
draft,
active_task="order_cancel",
)
return self._http_exception_detail(exc)
self.state.pop_entry("pending_cancel_order_drafts", user_id)
self._pop_order_flow_entry(
"pending_cancel_order_drafts",
user_id,
"order_cancel",
active_task="order_cancel",
)
return self._fallback_format_tool_result("cancelar_pedido", tool_result)

@ -12,6 +12,112 @@ from app.services.orchestration.orchestrator_config import (
# Esse mixin concentra os fluxos incrementais de revisao e pos-venda.
class ReviewFlowMixin:
def _review_now(self) -> datetime:
provider = getattr(self, "_review_now_provider", None)
if callable(provider):
return provider()
return datetime.now()
def _get_review_flow_snapshot(self, user_id: int | None, snapshot_key: str) -> dict | None:
if user_id is None or not hasattr(self, "_get_user_context"):
return None
context = self._get_user_context(user_id)
if not isinstance(context, dict):
return None
snapshots = context.get("flow_snapshots")
if not isinstance(snapshots, dict):
return None
snapshot = snapshots.get(snapshot_key)
return dict(snapshot) if isinstance(snapshot, dict) else None
def _set_review_flow_snapshot(
self,
user_id: int | None,
snapshot_key: str,
value: dict | None,
*,
active_task: str | None = None,
) -> None:
if user_id is None or not hasattr(self, "_get_user_context") or not hasattr(self, "_save_user_context"):
return
context = self._get_user_context(user_id)
if not isinstance(context, dict):
return
snapshots = context.get("flow_snapshots")
if not isinstance(snapshots, dict):
snapshots = {}
context["flow_snapshots"] = snapshots
if isinstance(value, dict):
snapshots[snapshot_key] = value
if active_task:
context["active_task"] = active_task
collected_slots = context.get("collected_slots")
if not isinstance(collected_slots, dict):
collected_slots = {}
context["collected_slots"] = collected_slots
payload = value.get("payload")
if isinstance(payload, dict):
collected_slots[active_task] = dict(payload)
else:
snapshots.pop(snapshot_key, None)
if active_task and context.get("active_task") == active_task:
context["active_task"] = None
collected_slots = context.get("collected_slots")
if isinstance(collected_slots, dict) and active_task:
collected_slots.pop(active_task, None)
self._save_user_context(user_id=user_id, context=context)
def _get_review_flow_entry(self, bucket: str, user_id: int | None, snapshot_key: str) -> dict | None:
entry = self.state.get_entry(bucket, user_id, expire=True)
if entry:
return entry
snapshot = self._get_review_flow_snapshot(user_id=user_id, snapshot_key=snapshot_key)
if not snapshot:
return None
if snapshot.get("expires_at") and snapshot["expires_at"] < datetime.utcnow():
self._set_review_flow_snapshot(user_id=user_id, snapshot_key=snapshot_key, value=None)
return None
self.state.set_entry(bucket, user_id, snapshot)
return snapshot
def _set_review_flow_entry(
self,
bucket: str,
user_id: int | None,
snapshot_key: str,
value: dict,
*,
active_task: str | None = None,
) -> None:
self.state.set_entry(bucket, user_id, value)
self._set_review_flow_snapshot(
user_id=user_id,
snapshot_key=snapshot_key,
value=value,
active_task=active_task,
)
def _pop_review_flow_entry(
self,
bucket: str,
user_id: int | None,
snapshot_key: str,
*,
active_task: str | None = None,
) -> dict | None:
entry = self.state.pop_entry(bucket, user_id)
self._set_review_flow_snapshot(
user_id=user_id,
snapshot_key=snapshot_key,
value=None,
active_task=active_task,
)
return entry
def _decision_intent(self, turn_decision: dict | None) -> str:
return str((turn_decision or {}).get("intent") or "").strip().lower()
@ -52,7 +158,10 @@ class ReviewFlowMixin:
break
if "data_hora" not in payload:
normalized_datetime = self._normalize_review_datetime_text(message)
normalized_datetime = self.normalizer.normalize_review_datetime_text(
message,
now_provider=self._review_now,
)
if normalized_datetime and normalized_datetime != str(message or "").strip():
payload["data_hora"] = normalized_datetime
@ -105,9 +214,9 @@ class ReviewFlowMixin:
if self.normalizer.extract_hhmm_from_text(message):
return None
if "hoje" in normalized_text:
return datetime.now().strftime("%d/%m/%Y")
return self._review_now().strftime("%d/%m/%Y")
if "amanha" in normalized_text:
return (datetime.now() + timedelta(days=1)).strftime("%d/%m/%Y")
return (self._review_now() + timedelta(days=1)).strftime("%d/%m/%Y")
return None
def _merge_review_base_date_with_time(self, message: str, payload: dict) -> None:
@ -168,6 +277,22 @@ class ReviewFlowMixin:
}
return any(term in normalized_message for term in explicit_review_terms)
def _is_explicit_review_reuse_request(self, message: str) -> bool:
normalized_message = self._normalize_text(message).strip()
reuse_terms = {
"reutilizar",
"reaproveitar",
"usar de novo",
"usar novamente",
"mesmo carro",
"ultimo carro",
"ultimo veiculo",
"ultimo veículo",
}
if not any(term in normalized_message for term in reuse_terms):
return False
return any(term in normalized_message for term in {"carro", "veiculo", "veículo", "informacoes", "dados"})
async def _try_handle_review_management(
self,
message: str,
@ -179,9 +304,13 @@ class ReviewFlowMixin:
if user_id is None:
return None
normalized_intents = self._normalize_intents(intents)
draft = self.state.get_entry("pending_review_management_drafts", user_id, expire=True)
schedule_draft = self.state.get_entry("pending_review_drafts", user_id, expire=True)
pending_reuse = self.state.get_entry("pending_review_reuse_confirmations", user_id, expire=True)
draft = self._get_review_flow_entry("pending_review_management_drafts", user_id, "review_management")
schedule_draft = self._get_review_flow_entry("pending_review_drafts", user_id, "review_schedule")
pending_reuse = self._get_review_flow_entry(
"pending_review_reuse_confirmations",
user_id,
"review_reuse_confirmation",
)
decision_intent = self._decision_intent(turn_decision)
inferred_action = self._infer_review_management_action(message=message, extracted_fields=extracted_fields)
normalized_fields = self._normalize_review_management_fields(extracted_fields)
@ -209,7 +338,12 @@ class ReviewFlowMixin:
if (decision_intent == "review_schedule" or normalized_intents.get("review_schedule", False)) and inferred_action is None:
if draft is not None:
self.state.pop_entry("pending_review_management_drafts", user_id)
self._pop_review_flow_entry(
"pending_review_management_drafts",
user_id,
"review_management",
active_task="review_management",
)
draft = None
return None
@ -260,7 +394,13 @@ class ReviewFlowMixin:
draft["payload"].update(extracted)
draft["expires_at"] = datetime.utcnow() + timedelta(minutes=PENDING_REVIEW_DRAFT_TTL_MINUTES)
self.state.set_entry("pending_review_management_drafts", user_id, draft)
self._set_review_flow_entry(
"pending_review_management_drafts",
user_id,
"review_management",
draft,
active_task="review_management",
)
if action == "reschedule":
missing = [field for field in ("protocolo", "nova_data_hora") if field not in draft["payload"]]
@ -280,9 +420,20 @@ class ReviewFlowMixin:
if error.get("retryable") and error.get("field"):
draft["payload"].pop(str(error["field"]), None)
draft["expires_at"] = datetime.utcnow() + timedelta(minutes=PENDING_REVIEW_DRAFT_TTL_MINUTES)
self.state.set_entry("pending_review_management_drafts", user_id, draft)
self._set_review_flow_entry(
"pending_review_management_drafts",
user_id,
"review_management",
draft,
active_task="review_management",
)
return self._http_exception_detail(exc)
self.state.pop_entry("pending_review_management_drafts", user_id)
self._pop_review_flow_entry(
"pending_review_management_drafts",
user_id,
"review_management",
active_task="review_management",
)
return self._fallback_format_tool_result("editar_data_revisao", tool_result)
missing = [field for field in ("protocolo",) if field not in draft["payload"]]
@ -302,9 +453,20 @@ class ReviewFlowMixin:
if error.get("retryable") and error.get("field"):
draft["payload"].pop(str(error["field"]), None)
draft["expires_at"] = datetime.utcnow() + timedelta(minutes=PENDING_REVIEW_DRAFT_TTL_MINUTES)
self.state.set_entry("pending_review_management_drafts", user_id, draft)
self._set_review_flow_entry(
"pending_review_management_drafts",
user_id,
"review_management",
draft,
active_task="review_management",
)
return self._http_exception_detail(exc)
self.state.pop_entry("pending_review_management_drafts", user_id)
self._pop_review_flow_entry(
"pending_review_management_drafts",
user_id,
"review_management",
active_task="review_management",
)
return self._fallback_format_tool_result("cancelar_agendamento_revisao", tool_result)
def _render_missing_review_fields_prompt(self, missing_fields: list[str]) -> str:
@ -410,19 +572,40 @@ class ReviewFlowMixin:
return None
if has_management_intent:
self.state.pop_entry("pending_review_drafts", user_id)
self.state.pop_entry("pending_review_reuse_confirmations", user_id)
self._pop_review_flow_entry(
"pending_review_drafts",
user_id,
"review_schedule",
active_task="review_schedule",
)
self._pop_review_flow_entry(
"pending_review_reuse_confirmations",
user_id,
"review_reuse_confirmation",
)
return None
draft = self.state.get_entry("pending_review_drafts", user_id, expire=True)
draft = self._get_review_flow_entry("pending_review_drafts", user_id, "review_schedule")
extracted = self._normalize_review_fields(extracted_fields)
pending_reuse = self.state.get_entry("pending_review_reuse_confirmations", user_id, expire=True)
pending_confirmation = self.state.get_entry("pending_review_confirmations", user_id, expire=True)
pending_reuse = self._get_review_flow_entry(
"pending_review_reuse_confirmations",
user_id,
"review_reuse_confirmation",
)
pending_confirmation = self._get_review_flow_entry(
"pending_review_confirmations",
user_id,
"review_confirmation",
)
active_review_context = self._active_domain(user_id) == "review"
review_flow_source = "draft" if draft else None
if has_intent and draft is None and pending_confirmation and not self._is_affirmative_message(message):
self.state.pop_entry("pending_review_confirmations", user_id)
self._pop_review_flow_entry(
"pending_review_confirmations",
user_id,
"review_confirmation",
)
pending_confirmation = None
if pending_reuse:
@ -432,14 +615,24 @@ class ReviewFlowMixin:
if date_only and not has_explicit_time:
extracted.pop("data_hora", None)
if self._is_negative_message(message):
self.state.pop_entry("pending_review_reuse_confirmations", user_id)
self._pop_review_flow_entry(
"pending_review_reuse_confirmations",
user_id,
"review_reuse_confirmation",
)
pending_reuse = None
if not extracted:
draft = {
"payload": {},
"expires_at": datetime.utcnow() + timedelta(minutes=PENDING_REVIEW_DRAFT_TTL_MINUTES),
}
self.state.set_entry("pending_review_drafts", user_id, draft)
self._set_review_flow_entry(
"pending_review_drafts",
user_id,
"review_schedule",
draft,
active_task="review_schedule",
)
self._log_review_flow_source(
source="last_review_package",
payload=draft["payload"],
@ -464,11 +657,21 @@ class ReviewFlowMixin:
else:
for key, value in seed_payload.items():
draft["payload"].setdefault(key, value)
self.state.pop_entry("pending_review_reuse_confirmations", user_id)
self._pop_review_flow_entry(
"pending_review_reuse_confirmations",
user_id,
"review_reuse_confirmation",
)
review_flow_source = "last_review_package"
if date_only and not extracted.get("data_hora"):
draft["payload"]["data_hora_base"] = date_only
self.state.set_entry("pending_review_drafts", user_id, draft)
self._set_review_flow_entry(
"pending_review_drafts",
user_id,
"review_schedule",
draft,
active_task="review_schedule",
)
self._log_review_flow_source(
source=review_flow_source,
payload=draft["payload"],
@ -476,16 +679,33 @@ class ReviewFlowMixin:
)
return f"Perfeito. Tenho a data {date_only}. Agora me informe o horario desejado para a revisao."
if "data_hora" not in extracted:
self.state.set_entry("pending_review_drafts", user_id, draft)
self._set_review_flow_entry(
"pending_review_drafts",
user_id,
"review_schedule",
draft,
active_task="review_schedule",
)
self._log_review_flow_source(source=review_flow_source, payload=draft["payload"], missing_fields=["data_hora"])
return "Perfeito. Me informe apenas a data e hora desejada para a revisao."
if has_intent and draft is None and not extracted:
last_package = self._get_last_review_package(user_id=user_id)
if last_package:
self.state.set_entry(
explicit_reuse_request = self._is_explicit_review_reuse_request(message)
active_context_reuse_request = (
active_review_context
and draft is None
and self._should_bootstrap_review_from_active_context(message=message, payload=extracted)
)
should_offer_reuse = bool(last_package) and not pending_reuse and (
(has_intent and draft is None)
or explicit_reuse_request
or active_context_reuse_request
)
if should_offer_reuse:
self._set_review_flow_entry(
"pending_review_reuse_confirmations",
user_id,
"review_reuse_confirmation",
{
"payload": last_package,
"expires_at": datetime.utcnow() + timedelta(minutes=PENDING_REVIEW_DRAFT_TTL_MINUTES),
@ -504,7 +724,12 @@ class ReviewFlowMixin:
)
and not extracted
):
self.state.pop_entry("pending_review_drafts", user_id)
self._pop_review_flow_entry(
"pending_review_drafts",
user_id,
"review_schedule",
active_task="review_schedule",
)
return None
bootstrap_payload = dict(extracted)
@ -544,7 +769,13 @@ class ReviewFlowMixin:
elif self._is_negative_message(message):
draft["payload"]["revisao_previa_concessionaria"] = False
draft["expires_at"] = datetime.utcnow() + timedelta(minutes=PENDING_REVIEW_DRAFT_TTL_MINUTES)
self.state.set_entry("pending_review_drafts", user_id, draft)
self._set_review_flow_entry(
"pending_review_drafts",
user_id,
"review_schedule",
draft,
active_task="review_schedule",
)
missing = [field for field in REVIEW_REQUIRED_FIELDS if field not in draft["payload"]]
if missing:
@ -573,11 +804,22 @@ class ReviewFlowMixin:
if error.get("retryable") and error.get("field"):
draft["payload"].pop(str(error["field"]), None)
draft["expires_at"] = datetime.utcnow() + timedelta(minutes=PENDING_REVIEW_DRAFT_TTL_MINUTES)
self.state.set_entry("pending_review_drafts", user_id, draft)
self._set_review_flow_entry(
"pending_review_drafts",
user_id,
"review_schedule",
draft,
active_task="review_schedule",
)
self._log_review_flow_source(source=review_flow_source or "draft", payload=draft["payload"])
return self._http_exception_detail(exc)
self.state.pop_entry("pending_review_drafts", user_id)
self._pop_review_flow_entry(
"pending_review_drafts",
user_id,
"review_schedule",
active_task="review_schedule",
)
self._store_last_review_package(user_id=user_id, payload=draft["payload"])
self._log_review_flow_source(source=review_flow_source or "draft", payload=draft["payload"])
return self._fallback_format_tool_result("agendar_revisao", tool_result)

@ -335,6 +335,9 @@ class ConversationPolicy:
decision_intent = self._decision_intent(turn_decision)
if decision_domain in {"review", "sales"} or decision_intent not in {"", "general"}:
return True
return self.looks_like_fresh_operational_request_from_text(message)
def looks_like_fresh_operational_request_from_text(self, message: str) -> bool:
normalized = self.service.normalizer.normalize_text(message).strip()
if len(normalized) < 15:
return False
@ -384,6 +387,14 @@ class ConversationPolicy:
if user_id is None or self.is_explicit_flow_cancel_message(message):
return False
context = self.service._get_user_context(user_id)
active_domain = str((context or {}).get("active_domain") or "general")
if (
self.looks_like_fresh_operational_request_from_text(message)
and not self.has_open_flow(user_id=user_id, domain=active_domain)
):
return True
pending_cancel_order = self.service.state.get_entry("pending_cancel_order_drafts", user_id, expire=True)
if pending_cancel_order:
payload = pending_cancel_order.get("payload", {})
@ -640,6 +651,23 @@ class ConversationPolicy:
return True
return self.service._is_affirmative_message(message) or self.service._is_negative_message(message)
def is_simple_confirmation_message(self, message: str) -> bool:
normalized = self.strip_choice_message(self.service.normalizer.normalize_text(message))
return normalized in {
"sim",
"nao",
"não",
"ok",
"pode",
"confirmo",
"aceito",
"fechado",
"pode sim",
"tenho",
"tenho sim",
"negativo",
}
# Executa o próximo pedido da fila quando o usuário disser “continuar”.
def is_continue_queue_message(self, message: str, turn_decision: dict | None = None) -> bool:
@ -771,6 +799,11 @@ class ConversationPolicy:
return None
if not self.has_open_flow(user_id=user_id, domain=current_domain):
return None
if (
self.is_simple_confirmation_message(message)
and not self.looks_like_fresh_operational_request_from_text(message)
):
return None
context["pending_switch"] = {
"source_domain": current_domain,
@ -808,8 +841,18 @@ class ConversationPolicy:
#Mensagem exibida após a troca acontecer.
def render_domain_onboarding_prompt(self, target_domain: str) -> str:
if target_domain == "sales":
return "Pode me dizer a faixa de preco, o modelo ou o tipo de carro que voce procura."
if target_domain == "review":
return "Pode me informar a placa ou, se preferir, ja mandar placa, data/hora, modelo, ano, km e se ja fez revisao."
return "Pode me dizer o que voce quer fazer agora?"
def render_context_switched_message(self, target_domain: str) -> str:
return f"Certo, contexto anterior encerrado. Vamos seguir com {self.domain_label(target_domain)}."
return (
f"Certo, contexto anterior encerrado. Vamos seguir com {self.domain_label(target_domain)}.\n"
f"{self.render_domain_onboarding_prompt(target_domain)}"
)
# Serve para depuração, observabilidade ou até para alimentar outro componente com um resumo do estado atual.

@ -15,6 +15,7 @@ class ConversationStateStore(ConversationStateRepository):
self.pending_review_reuse_confirmations: dict[int, dict] = {}
self.pending_order_drafts: dict[int, dict] = {}
self.pending_cancel_order_drafts: dict[int, dict] = {}
self.pending_stock_selections: dict[int, dict] = {}
def upsert_user_context(self, user_id: int | None, ttl_minutes: int) -> None:
if user_id is None:
@ -26,8 +27,12 @@ class ConversationStateStore(ConversationStateRepository):
return
self.user_contexts[user_id] = {
"active_domain": "general",
"active_task": None,
"generic_memory": {},
"shared_memory": {},
"collected_slots": {},
"flow_snapshots": {},
"last_tool_result": None,
"order_queue": [],
"pending_order_selection": None,
"pending_switch": None,

@ -15,6 +15,12 @@ logger = logging.getLogger(__name__)
# A semantica conversacional idealmente vem do modelo, nao daqui.
class EntityNormalizer:
_TOOL_NAME_ALIASES = {
"fazer_pedido": "realizar_pedido",
"fazer pedido": "realizar_pedido",
"criar_pedido": "realizar_pedido",
"criar pedido": "realizar_pedido",
"place_order": "realizar_pedido",
"create_order": "realizar_pedido",
"marcar_revisao": "agendar_revisao",
"agendar revisao": "agendar_revisao",
"schedule_review": "agendar_revisao",
@ -485,8 +491,8 @@ class EntityNormalizer:
def extract_hhmm_from_text(self, text: str) -> str | None:
return technical_normalizer.extract_hhmm_from_text(text)
def normalize_review_datetime_text(self, value) -> str | None:
return technical_normalizer.normalize_review_datetime_text(value)
def normalize_review_datetime_text(self, value, now_provider=None) -> str | None:
return technical_normalizer.normalize_review_datetime_text(value, now_provider=now_provider)
def normalize_generic_fields(self, data) -> dict:
if not isinstance(data, dict):

@ -191,6 +191,7 @@ class MessagePlanner:
"- 'entities' deve manter as secoes generic_memory, review_fields, review_management_fields, order_fields e cancel_order_fields.\n"
"- Em pedidos de compra com faixa de preco ou orcamento (ex.: '70 mil', 'ate 50 mil', 'R$ 45000'), preencha entities.generic_memory.orcamento_max.\n"
"- Em pedidos com tipo de carro (ex.: suv, sedan, hatch, pickup), preencha entities.generic_memory.perfil_veiculo.\n"
"- Se o usuario quiser efetivar a compra de um veiculo, use intent='order_create', domain='sales' e prefira tool_name='realizar_pedido'.\n"
"- Se o usuario quiser listar os pedidos dele, use intent='order_list', domain='sales', action='call_tool' e tool_name='listar_pedidos'.\n"
"- Se o usuario quiser listar agendamentos de revisao, use intent='review_list', domain='review', action='call_tool' e tool_name='listar_agendamentos_revisao'.\n"
"- Se o usuario quiser cancelar um agendamento de revisao, use intent='review_cancel', domain='review' e prefira tool_name='cancelar_agendamento_revisao'.\n"

@ -79,6 +79,29 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin):
return final_response
self._upsert_user_context(user_id=user_id)
if hasattr(self, "policy") and self._is_order_selection_reset_message(message):
reset_override = await self._try_handle_immediate_context_reset(
message=message,
user_id=user_id,
turn_decision={"action": "clear_context"},
finish=finish,
)
if reset_override:
return reset_override
pending_stock_selection_follow_up = await self._try_handle_pending_stock_selection_follow_up(
message=message,
user_id=user_id,
finish=finish,
)
if pending_stock_selection_follow_up:
return pending_stock_selection_follow_up
active_sales_follow_up = await self._try_handle_active_sales_follow_up(
message=message,
user_id=user_id,
finish=finish,
)
if active_sales_follow_up:
return active_sales_follow_up
# Faz uma leitura inicial do turno para ajudar a policy
# com fila, troca de contexto e comandos globais.
early_turn_decision = await self._extract_turn_decision_with_llm(
@ -187,9 +210,26 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin):
llm_generic_fields=sales_search_context,
)
should_prioritize_review_flow = self._should_prioritize_review_flow(
turn_decision=turn_decision,
extracted_entities=extracted_entities,
user_id=user_id,
)
should_prioritize_order_flow = self._should_prioritize_order_flow(
turn_decision=turn_decision,
extracted_entities=extracted_entities,
user_id=user_id,
message=routing_message,
)
domain_hint = self._domain_from_turn_decision(turn_decision)
if domain_hint == "general":
domain_hint = self._domain_from_intents(extracted_entities.get("intents", {}))
if self._should_consume_sales_follow_up_in_active_flow(
message=routing_message,
user_id=user_id,
extracted_entities=extracted_entities,
):
domain_hint = "sales"
context_switch_response = self._handle_context_switch(
message=routing_message,
user_id=user_id,
@ -214,16 +254,6 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin):
decision_action = str(turn_decision.get("action") or "")
decision_response = str(turn_decision.get("response_to_user") or "").strip()
should_prioritize_review_flow = self._should_prioritize_review_flow(
turn_decision=turn_decision,
extracted_entities=extracted_entities,
user_id=user_id,
)
should_prioritize_order_flow = self._should_prioritize_order_flow(
turn_decision=turn_decision,
extracted_entities=extracted_entities,
user_id=user_id,
)
if (
decision_action == "ask_missing_fields"
and decision_response
@ -239,6 +269,7 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin):
):
return await finish(decision_response, queue_notice=queue_notice)
if not should_prioritize_order_flow:
planned_tool_response = await self._try_execute_business_tool_from_turn_decision(
message=routing_message,
user_id=user_id,
@ -356,8 +387,9 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin):
)
if stock_suggestion_response:
return await finish(stock_suggestion_response, queue_notice=queue_notice)
self._capture_tool_result_context(
self._capture_successful_tool_side_effects(
tool_name=tool_name,
arguments=arguments,
tool_result=tool_result,
user_id=user_id,
)
@ -496,6 +528,107 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin):
queue_notice=queue_notice,
)
async def _try_handle_pending_stock_selection_follow_up(
self,
message: str,
user_id: int | None,
finish,
) -> str | None:
if user_id is None:
return None
pending_selection = self.state.get_entry("pending_stock_selections", user_id, expire=True)
if not pending_selection:
return None
if not self._should_bootstrap_order_from_context(
message=message,
user_id=user_id,
payload={},
):
return None
response = await self._try_collect_and_create_order(
message=message,
user_id=user_id,
extracted_fields={},
intents={},
turn_decision={
"intent": "order_create",
"domain": "sales",
"action": "collect_order_create",
},
)
if not response:
return None
return await finish(response)
async def _try_handle_active_sales_follow_up(
self,
message: str,
user_id: int | None,
finish,
) -> str | None:
if user_id is None:
return None
context = self._get_user_context(user_id)
if not isinstance(context, dict):
return None
if str(context.get("active_domain") or "").strip().lower() != "sales":
return None
normalized_message = self.normalizer.normalize_text(message).strip()
if self._looks_like_explicit_domain_shift_request(normalized_message):
return None
pending_order_draft = self.state.get_entry("pending_order_drafts", user_id, expire=True)
if pending_order_draft:
if self._should_restart_open_order_draft(
message=message,
user_id=user_id,
turn_decision={
"intent": "inventory_search" if self._has_stock_listing_request(message) else "order_create",
"domain": "sales",
"action": "collect_order_create",
},
):
self._pop_order_flow_entry(
"pending_order_drafts",
user_id,
"order_create",
active_task="order_create",
)
self._reset_order_stock_context(user_id=user_id)
return None
response = await self._try_collect_and_create_order(
message=message,
user_id=user_id,
extracted_fields={},
intents={},
turn_decision={
"intent": "order_create",
"domain": "sales",
"action": "collect_order_create",
},
)
if response:
return await finish(response)
pending_cancel_order_draft = self.state.get_entry("pending_cancel_order_drafts", user_id, expire=True)
if pending_cancel_order_draft:
response = await self._try_collect_and_cancel_order(
message=message,
user_id=user_id,
extracted_fields={},
intents={},
turn_decision={
"intent": "order_cancel",
"domain": "sales",
"action": "collect_order_cancel",
},
)
if response:
return await finish(response)
return None
async def _try_execute_business_tool_from_turn_decision(
self,
message: str,
@ -537,8 +670,9 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin):
if stock_suggestion_response:
return await finish(stock_suggestion_response, queue_notice=queue_notice)
self._capture_tool_result_context(
self._capture_successful_tool_side_effects(
tool_name=tool_name,
arguments=arguments,
tool_result=tool_result,
user_id=user_id,
)
@ -566,12 +700,41 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin):
self.state.pop_entry("pending_review_confirmations", user_id)
self.state.pop_entry("pending_review_management_drafts", user_id)
self.state.pop_entry("pending_review_reuse_confirmations", user_id)
context = self._get_user_context(user_id)
if isinstance(context, dict):
snapshots = context.get("flow_snapshots")
if isinstance(snapshots, dict):
snapshots.pop("review_schedule", None)
snapshots.pop("review_confirmation", None)
snapshots.pop("review_management", None)
snapshots.pop("review_reuse_confirmation", None)
collected_slots = context.get("collected_slots")
if isinstance(collected_slots, dict):
collected_slots.pop("review_schedule", None)
collected_slots.pop("review_management", None)
if context.get("active_task") in {"review_schedule", "review_management"}:
context["active_task"] = None
self._save_user_context(user_id=user_id, context=context)
def _reset_pending_order_states(self, user_id: int | None) -> None:
if user_id is None:
return
self.state.pop_entry("pending_order_drafts", user_id)
self.state.pop_entry("pending_cancel_order_drafts", user_id)
self.state.pop_entry("pending_stock_selections", user_id)
context = self._get_user_context(user_id)
if isinstance(context, dict):
snapshots = context.get("flow_snapshots")
if isinstance(snapshots, dict):
snapshots.pop("order_create", None)
snapshots.pop("order_cancel", None)
collected_slots = context.get("collected_slots")
if isinstance(collected_slots, dict):
collected_slots.pop("order_create", None)
collected_slots.pop("order_cancel", None)
if context.get("active_task") in {"order_create", "order_cancel"}:
context["active_task"] = None
self._save_user_context(user_id=user_id, context=context)
def _clear_user_conversation_state(self, user_id: int | None) -> None:
context = self._get_user_context(user_id)
@ -579,9 +742,14 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin):
return
self._reset_pending_review_states(user_id=user_id)
self._reset_pending_order_states(user_id=user_id)
self.state.pop_entry("last_review_packages", user_id)
context["active_domain"] = "general"
context["active_task"] = None
context["generic_memory"] = {}
context["shared_memory"] = {}
context["collected_slots"] = {}
context["flow_snapshots"] = {}
context["last_tool_result"] = None
context["order_queue"] = []
context["pending_order_selection"] = None
context["pending_switch"] = None
@ -760,7 +928,12 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin):
context = self._get_user_context(user_id)
if not context:
return
context["last_tool_result"] = {
"tool_name": tool_name,
"result_type": type(tool_result).__name__,
}
if tool_name != "consultar_estoque" or not isinstance(tool_result, list):
self._save_user_context(user_id=user_id, context=context)
return
sanitized: list[dict] = []
@ -778,14 +951,31 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin):
"modelo": str(item.get("modelo") or "").strip(),
"categoria": str(item.get("categoria") or "").strip(),
"preco": preco,
"budget_relaxed": bool(item.get("budget_relaxed", False)),
}
)
context["last_stock_results"] = sanitized
self._store_pending_stock_selection(user_id=user_id, stock_results=sanitized)
if sanitized:
context["selected_vehicle"] = None
self._save_user_context(user_id=user_id, context=context)
def _capture_successful_tool_side_effects(
self,
tool_name: str,
arguments: dict | None,
tool_result,
user_id: int | None,
) -> None:
if tool_name == "agendar_revisao" and isinstance(arguments, dict):
self._store_last_review_package(user_id=user_id, payload=arguments)
self._capture_tool_result_context(
tool_name=tool_name,
tool_result=tool_result,
user_id=user_id,
)
async def _maybe_build_stock_suggestion_response(
self,
tool_name: str,
@ -831,6 +1021,8 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin):
if not nearby:
return None
nearby = [{**item, "budget_relaxed": True} for item in nearby]
self._capture_tool_result_context(
tool_name="consultar_estoque",
tool_result=nearby,
@ -931,7 +1123,23 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin):
else message
)
self._clear_user_conversation_state(user_id=user_id)
if not cleaned_message or cleaned_message.strip() == (message or "").strip():
normalized_cleaned = self.normalizer.normalize_text(cleaned_message).strip(" ,.:;-")
if (
not cleaned_message
or cleaned_message.strip() == (message or "").strip()
or normalized_cleaned in {
"",
"e",
"agora",
"e agora",
"e vamos recomecar",
"vamos recomecar",
"recomecar",
"e vamos comecar de novo",
"vamos comecar de novo",
"comecar de novo",
}
):
return await finish("Contexto da conversa limpo. Podemos recomecar do zero.")
return await self.handle_message(cleaned_message, user_id=user_id)
@ -995,6 +1203,7 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin):
turn_decision: dict | None,
extracted_entities: dict | None,
user_id: int | None = None,
message: str | None = None,
) -> bool:
decision = turn_decision or {}
decision_intent = str(decision.get("intent") or "").strip().lower()
@ -1003,10 +1212,20 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin):
)
if has_open_cancel_order_draft:
return True
entities = extracted_entities if isinstance(extracted_entities, dict) else {}
order_fields = entities.get("order_fields")
if not isinstance(order_fields, dict):
order_fields = {}
if user_id is not None and message and self._should_bootstrap_order_from_context(
message=message,
user_id=user_id,
payload=order_fields,
):
return True
if decision_intent == "order_list":
return True
if decision_intent == "order_cancel":
cancel_order_fields = (extracted_entities if isinstance(extracted_entities, dict) else {}).get("cancel_order_fields")
cancel_order_fields = entities.get("cancel_order_fields")
if not isinstance(cancel_order_fields, dict):
cancel_order_fields = {}
return bool(
@ -1017,13 +1236,9 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin):
if decision_intent != "order_create":
return False
entities = extracted_entities if isinstance(extracted_entities, dict) else {}
generic_memory = entities.get("generic_memory")
order_fields = entities.get("order_fields")
if not isinstance(generic_memory, dict):
generic_memory = {}
if not isinstance(order_fields, dict):
order_fields = {}
return any(
(
@ -1035,6 +1250,57 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin):
)
)
def _should_consume_sales_follow_up_in_active_flow(
self,
message: str,
user_id: int | None,
extracted_entities: dict | None,
) -> bool:
if user_id is None:
return False
context = self._get_user_context(user_id)
if not isinstance(context, dict):
return False
if str(context.get("active_domain") or "").strip().lower() != "sales":
return False
if not (
self.state.get_entry("pending_order_drafts", user_id, expire=True)
or self.state.get_entry("pending_cancel_order_drafts", user_id, expire=True)
or self.state.get_entry("pending_stock_selections", user_id, expire=True)
):
return False
entities = extracted_entities if isinstance(extracted_entities, dict) else {}
order_fields = entities.get("order_fields")
if not isinstance(order_fields, dict):
order_fields = {}
if order_fields.get("cpf"):
return True
if self._normalize_cpf(message):
return True
return self._should_bootstrap_order_from_context(
message=message,
user_id=user_id,
payload=order_fields,
)
def _looks_like_explicit_domain_shift_request(self, normalized_message: str) -> bool:
if not normalized_message:
return False
shift_terms = (
"quero agendar",
"quero marcar",
"quero uma revisao",
"agendar uma revisao",
"marcar uma revisao",
"revisao",
"agendamento",
"cancelar revisao",
"remarcar revisao",
)
return any(term in normalized_message for term in shift_terms)
def _should_prioritize_review_flow(
self,
turn_decision: dict | None,

@ -32,8 +32,12 @@ class RedisConversationStateRepository(ConversationStateRepository):
else:
context = {
"active_domain": "general",
"active_task": None,
"generic_memory": {},
"shared_memory": {},
"collected_slots": {},
"flow_snapshots": {},
"last_tool_result": None,
"order_queue": [],
"pending_order_selection": None,
"pending_switch": None,

@ -101,6 +101,7 @@ class FakeRegistry:
2: ("Toyota Corolla 2020", 58476.0),
3: ("Chevrolet Onix 2022", 51809.0),
7: ("Fiat Argo 2020", 61857.0),
9: ("Hyundai HB20S 2022", 76000.0),
}
modelo_veiculo, valor_veiculo = vehicle_map[arguments["vehicle_id"]]
return {
@ -236,13 +237,14 @@ class OrderFlowHarness(OrderFlowMixin):
class ReviewFlowHarness(ReviewFlowMixin):
def __init__(self, state, registry):
def __init__(self, state, registry, review_now_provider=None):
self.state = state
self.registry = registry
self.tool_executor = registry
self.normalizer = EntityNormalizer()
self.captured_suggestions = []
self.logged_events = []
self._review_now_provider = review_now_provider
def _normalize_intents(self, data) -> dict:
return self.normalizer.normalize_intents(data)
@ -266,6 +268,11 @@ class ReviewFlowHarness(ReviewFlowMixin):
def _get_user_context(self, user_id: int | None):
return self.state.get_user_context(user_id)
def _save_user_context(self, user_id: int | None, context: dict | None) -> None:
if user_id is None or not isinstance(context, dict):
return
self.state.save_user_context(user_id, context)
def _fallback_format_tool_result(self, tool_name: str, tool_result) -> str:
return f"{tool_name}:{tool_result}"
@ -350,6 +357,21 @@ class ConversationAdjustmentsTests(unittest.TestCase):
self.assertTrue(policy.should_defer_flow_cancellation_control("nao", user_id=7))
self.assertFalse(policy.should_defer_flow_cancellation_control("cancelar fluxo atual", user_id=7))
def test_defer_flow_cancel_for_fresh_sales_request_without_open_flow(self):
state = FakeState(
contexts={
7: {
"active_domain": "sales",
"pending_order_selection": None,
"pending_switch": None,
}
}
)
policy = ConversationPolicy(service=FakeService(state))
self.assertTrue(policy.should_defer_flow_cancellation_control("agora eu quero comprar um carro de ate 70 mil", user_id=7))
self.assertFalse(policy.should_defer_flow_cancellation_control("cancelar fluxo atual", user_id=7))
def test_normalize_datetime_connector_accepts_as_com_acento(self):
normalizer = EntityNormalizer()
@ -521,6 +543,46 @@ class CancelOrderFlowTests(unittest.IsolatedAsyncioTestCase):
self.assertIsNone(response)
self.assertEqual(registry.calls, [])
async def test_cancel_order_flow_restores_draft_from_context_snapshot_when_bucket_is_missing(self):
state = FakeState(
contexts={
42: {
"active_domain": "sales",
"active_task": "order_cancel",
"generic_memory": {},
"shared_memory": {},
"collected_slots": {
"order_cancel": {"numero_pedido": "PED-20260305120000-ABC123"},
},
"flow_snapshots": {
"order_cancel": {
"payload": {"numero_pedido": "PED-20260305120000-ABC123"},
"expires_at": datetime.utcnow() + timedelta(minutes=30),
}
},
"last_stock_results": [],
"selected_vehicle": None,
}
}
)
registry = FakeRegistry()
flow = OrderFlowHarness(state=state, registry=registry)
response = await flow._try_collect_and_cancel_order(
message="desisti da compra",
user_id=42,
extracted_fields={},
intents={},
)
self.assertEqual(len(registry.calls), 1)
tool_name, arguments, tool_user_id = registry.calls[0]
self.assertEqual(tool_name, "cancelar_pedido")
self.assertEqual(tool_user_id, 42)
self.assertEqual(arguments["numero_pedido"], "PED-20260305120000-ABC123")
self.assertEqual(arguments["motivo"], "desisti da compra")
self.assertIn("Status: Cancelado", response)
class CreateOrderFlowWithVehicleTests(unittest.IsolatedAsyncioTestCase):
async def test_order_listing_preserves_open_order_draft(self):
@ -706,6 +768,56 @@ class CreateOrderFlowWithVehicleTests(unittest.IsolatedAsyncioTestCase):
self.assertIn("Honda Civic 2021", response)
self.assertEqual(len(flow._get_last_stock_results(user_id=10)), 2)
async def test_order_flow_restarts_open_draft_when_user_requests_new_budget_search(self):
state = FakeState(
entries={
"pending_order_drafts": {
10: {
"payload": {
"cpf": "12345678909",
"vehicle_id": 15,
"modelo_veiculo": "Volkswagen T-Cross 2022",
"valor_veiculo": 73224.0,
},
"expires_at": datetime.utcnow() + timedelta(minutes=30),
}
}
},
contexts={
10: {
"active_domain": "sales",
"generic_memory": {"cpf": "12345678909", "orcamento_max": 70000, "perfil_veiculo": ["suv"]},
"shared_memory": {"cpf": "12345678909", "orcamento_max": 70000, "perfil_veiculo": ["suv"]},
"last_stock_results": [
{"id": 15, "modelo": "Volkswagen T-Cross 2022", "categoria": "suv", "preco": 73224.0, "budget_relaxed": True},
],
"selected_vehicle": {"id": 15, "modelo": "Volkswagen T-Cross 2022", "categoria": "suv", "preco": 73224.0, "budget_relaxed": True},
}
},
)
registry = FakeRegistry()
flow = OrderFlowHarness(state=state, registry=registry)
async def fake_hydrate_mock_customer_from_cpf(cpf: str, user_id: int | None = None):
return {"cpf": cpf, "user_id": user_id}
with patch(
"app.services.flows.order_flow.hydrate_mock_customer_from_cpf",
new=fake_hydrate_mock_customer_from_cpf,
):
response = await flow._try_collect_and_create_order(
message="agora quero comprar um carro de ate 60 mil",
user_id=10,
extracted_fields={},
intents={},
turn_decision={"intent": "order_create", "domain": "sales", "action": "collect_order_create"},
)
self.assertEqual(registry.calls[0][0], "consultar_estoque")
self.assertIn("Encontrei 2 veiculo(s):", response)
self.assertEqual(state.get_user_context(10)["generic_memory"]["orcamento_max"], 60000)
self.assertIsNone(state.get_user_context(10)["selected_vehicle"])
async def test_order_flow_accepts_turn_decision_without_legacy_intents(self):
state = FakeState(
contexts={
@ -933,6 +1045,249 @@ class CreateOrderFlowWithVehicleTests(unittest.IsolatedAsyncioTestCase):
self.assertEqual(arguments["vehicle_id"], 2)
self.assertEqual(arguments["cpf"], "12345678909")
self.assertIn("Veiculo: Toyota Corolla 2020", response)
self.assertEqual(state.get_user_context(10).get("selected_vehicle"), None)
self.assertEqual(state.get_user_context(10).get("last_stock_results"), [])
self.assertEqual(state.get_user_context(10).get("pending_single_vehicle_confirmation"), None)
async def test_order_flow_reuses_last_stock_results_when_user_requests_order_by_model_name(self):
state = FakeState(
contexts={
10: {
"generic_memory": {},
"last_stock_results": [
{"id": 9, "modelo": "Hyundai HB20S 2022", "categoria": "sedan", "preco": 76000.0},
{"id": 2, "modelo": "Toyota Corolla 2020", "categoria": "hatch", "preco": 58476.0},
],
"selected_vehicle": None,
}
}
)
registry = FakeRegistry()
flow = OrderFlowHarness(state=state, registry=registry)
first_response = await flow._try_collect_and_create_order(
message="Quero fazer o pedido do Hyundai HB20S 2022",
user_id=10,
extracted_fields={"modelo_veiculo": "Hyundai HB20S 2022"},
intents={"order_create": True},
)
self.assertIn("cpf do cliente", first_response.lower())
self.assertEqual(registry.calls, [])
self.assertEqual(state.get_user_context(10)["selected_vehicle"]["id"], 9)
async def fake_hydrate_mock_customer_from_cpf(cpf: str, user_id: int | None = None):
return {"cpf": cpf, "user_id": user_id}
with patch(
"app.services.flows.order_flow.hydrate_mock_customer_from_cpf",
new=fake_hydrate_mock_customer_from_cpf,
):
second_response = await flow._try_collect_and_create_order(
message="12345678909",
user_id=10,
extracted_fields={},
intents={},
)
self.assertEqual(len(registry.calls), 1)
tool_name, arguments, tool_user_id = registry.calls[0]
self.assertEqual(tool_name, "realizar_pedido")
self.assertEqual(tool_user_id, 10)
self.assertEqual(arguments["vehicle_id"], 9)
self.assertEqual(arguments["cpf"], "12345678909")
self.assertIn("Veiculo: Hyundai HB20S 2022", second_response)
async def test_order_flow_bootstraps_selection_from_last_stock_results_without_repeating_order_verb(self):
state = FakeState(
contexts={
10: {
"active_domain": "sales",
"generic_memory": {"orcamento_max": 70000, "perfil_veiculo": ["suv"]},
"shared_memory": {"orcamento_max": 70000, "perfil_veiculo": ["suv"]},
"last_stock_results": [
{"id": 7, "modelo": "Fiat Argo 2020", "categoria": "suv", "preco": 61857.0},
{"id": 3, "modelo": "Chevrolet Onix 2022", "categoria": "suv", "preco": 51809.0},
],
"selected_vehicle": None,
}
}
)
registry = FakeRegistry()
flow = OrderFlowHarness(state=state, registry=registry)
first_response = await flow._try_collect_and_create_order(
message="quero a opcao 1",
user_id=10,
extracted_fields={},
intents={},
)
self.assertIn("cpf do cliente", first_response.lower())
self.assertEqual(registry.calls, [])
self.assertEqual(state.get_user_context(10)["selected_vehicle"]["id"], 7)
async def fake_hydrate_mock_customer_from_cpf(cpf: str, user_id: int | None = None):
return {"cpf": cpf, "user_id": user_id}
with patch(
"app.services.flows.order_flow.hydrate_mock_customer_from_cpf",
new=fake_hydrate_mock_customer_from_cpf,
):
second_response = await flow._try_collect_and_create_order(
message="12345678909",
user_id=10,
extracted_fields={},
intents={},
)
self.assertEqual(len(registry.calls), 1)
self.assertEqual(registry.calls[0][0], "realizar_pedido")
self.assertEqual(registry.calls[0][1]["vehicle_id"], 7)
self.assertEqual(registry.calls[0][1]["cpf"], "12345678909")
self.assertIn("Veiculo: Fiat Argo 2020", second_response)
async def test_order_flow_reads_vehicle_selection_from_pending_stock_selection_repository_entry(self):
state = FakeState(
entries={
"pending_stock_selections": {
10: {
"payload": [
{"id": 7, "modelo": "Fiat Argo 2020", "categoria": "suv", "preco": 61857.0},
{"id": 3, "modelo": "Chevrolet Onix 2022", "categoria": "suv", "preco": 51809.0},
],
"expires_at": datetime.utcnow() + timedelta(minutes=30),
}
}
},
contexts={
10: {
"active_domain": "sales",
"generic_memory": {},
"shared_memory": {},
"last_stock_results": [],
"selected_vehicle": None,
}
}
)
registry = FakeRegistry()
flow = OrderFlowHarness(state=state, registry=registry)
response = await flow._try_collect_and_create_order(
message="quero a opcao 1",
user_id=10,
extracted_fields={},
intents={},
)
self.assertIn("cpf do cliente", response.lower())
self.assertEqual(registry.calls, [])
self.assertEqual(state.get_user_context(10)["selected_vehicle"]["id"], 7)
async def test_order_flow_keeps_relaxed_budget_suggestion_selected_across_follow_up(self):
state = FakeState(
entries={
"pending_stock_selections": {
10: {
"payload": [
{"id": 9, "modelo": "Hyundai HB20S 2022", "categoria": "suv", "preco": 76000.0, "budget_relaxed": True},
{"id": 2, "modelo": "Toyota Corolla 2020", "categoria": "suv", "preco": 58476.0, "budget_relaxed": True},
],
"expires_at": datetime.utcnow() + timedelta(minutes=30),
}
}
},
contexts={
10: {
"active_domain": "sales",
"generic_memory": {"orcamento_max": 70000, "perfil_veiculo": ["suv"]},
"shared_memory": {"orcamento_max": 70000, "perfil_veiculo": ["suv"]},
"last_stock_results": [],
"selected_vehicle": None,
}
}
)
registry = FakeRegistry()
flow = OrderFlowHarness(state=state, registry=registry)
first_response = await flow._try_collect_and_create_order(
message="1",
user_id=10,
extracted_fields={},
intents={},
)
self.assertIn("cpf do cliente", first_response.lower())
self.assertEqual(state.get_user_context(10)["selected_vehicle"]["id"], 9)
async def fake_hydrate_mock_customer_from_cpf(cpf: str, user_id: int | None = None):
return {"cpf": cpf, "user_id": user_id}
with patch(
"app.services.flows.order_flow.hydrate_mock_customer_from_cpf",
new=fake_hydrate_mock_customer_from_cpf,
):
second_response = await flow._try_collect_and_create_order(
message="12345678909",
user_id=10,
extracted_fields={},
intents={},
)
self.assertEqual(len(registry.calls), 1)
self.assertEqual(registry.calls[0][0], "realizar_pedido")
self.assertEqual(registry.calls[0][1]["vehicle_id"], 9)
self.assertEqual(registry.calls[0][1]["cpf"], "12345678909")
self.assertIn("Veiculo: Hyundai HB20S 2022", second_response)
async def test_order_flow_restores_draft_from_context_snapshot_when_bucket_is_missing(self):
state = FakeState(
contexts={
10: {
"active_domain": "sales",
"active_task": "order_create",
"generic_memory": {},
"shared_memory": {},
"collected_slots": {
"order_create": {"vehicle_id": 2, "modelo_veiculo": "Toyota Corolla 2020"},
},
"flow_snapshots": {
"order_create": {
"payload": {"vehicle_id": 2, "modelo_veiculo": "Toyota Corolla 2020"},
"expires_at": datetime.utcnow() + timedelta(minutes=30),
}
},
"last_stock_results": [
{"id": 2, "modelo": "Toyota Corolla 2020", "categoria": "hatch", "preco": 58476.0},
],
"selected_vehicle": {"id": 2, "modelo": "Toyota Corolla 2020", "categoria": "hatch", "preco": 58476.0},
}
}
)
registry = FakeRegistry()
flow = OrderFlowHarness(state=state, registry=registry)
async def fake_hydrate_mock_customer_from_cpf(cpf: str, user_id: int | None = None):
return {"cpf": cpf, "user_id": user_id}
with patch(
"app.services.flows.order_flow.hydrate_mock_customer_from_cpf",
new=fake_hydrate_mock_customer_from_cpf,
):
response = await flow._try_collect_and_create_order(
message="12345678909",
user_id=10,
extracted_fields={},
intents={},
)
self.assertEqual(len(registry.calls), 1)
tool_name, arguments, tool_user_id = registry.calls[0]
self.assertEqual(tool_name, "realizar_pedido")
self.assertEqual(tool_user_id, 10)
self.assertEqual(arguments["vehicle_id"], 2)
self.assertEqual(arguments["cpf"], "12345678909")
self.assertIn("Veiculo: Toyota Corolla 2020", response)
async def test_order_flow_selection_uses_list_position_not_vehicle_id(self):
state = FakeState(
@ -1102,6 +1457,7 @@ class CreateOrderFlowWithVehicleTests(unittest.IsolatedAsyncioTestCase):
class ReviewFlowDraftTests(unittest.IsolatedAsyncioTestCase):
async def test_review_flow_extracts_relative_datetime_from_followup_message(self):
fixed_now = lambda: datetime(2026, 3, 12, 9, 0)
state = FakeState(
entries={
"pending_review_drafts": {
@ -1113,7 +1469,7 @@ class ReviewFlowDraftTests(unittest.IsolatedAsyncioTestCase):
}
)
registry = FakeRegistry()
flow = ReviewFlowHarness(state=state, registry=registry)
flow = ReviewFlowHarness(state=state, registry=registry, review_now_provider=fixed_now)
response = await flow._try_collect_and_schedule_review(
message="Eu gostaria de marcar amanha as 16 horas",
@ -1163,9 +1519,10 @@ class ReviewFlowDraftTests(unittest.IsolatedAsyncioTestCase):
self.assertIn("REV-TESTE-123", response)
async def test_review_flow_keeps_plate_and_datetime_across_incremental_messages(self):
fixed_now = lambda: datetime(2026, 3, 12, 9, 0)
state = FakeState()
registry = FakeRegistry()
flow = ReviewFlowHarness(state=state, registry=registry)
flow = ReviewFlowHarness(state=state, registry=registry, review_now_provider=fixed_now)
await flow._try_collect_and_schedule_review(
message="gostaria de marcar uma nova revisao agora",
@ -1249,6 +1606,60 @@ class ReviewFlowDraftTests(unittest.IsolatedAsyncioTestCase):
any(payload.get("review_flow_source") == "active_domain_fallback" for _, payload in flow.logged_events)
)
async def test_review_flow_restores_draft_from_context_snapshot_when_bucket_is_missing(self):
state = FakeState(
contexts={
21: {
"active_domain": "review",
"active_task": "review_schedule",
"generic_memory": {},
"shared_memory": {},
"collected_slots": {
"review_schedule": {"placa": "A0T1C23", "data_hora": "14/03/2026 18:00"},
},
"flow_snapshots": {
"review_schedule": {
"payload": {"placa": "A0T1C23", "data_hora": "14/03/2026 18:00"},
"expires_at": datetime.utcnow() + timedelta(minutes=30),
}
},
"order_queue": [],
"pending_order_selection": None,
"pending_switch": None,
"last_stock_results": [],
"selected_vehicle": None,
}
}
)
registry = FakeRegistry()
flow = ReviewFlowHarness(state=state, registry=registry)
await flow._try_collect_and_schedule_review(
message="o modelo e Onix e e 2024",
user_id=21,
extracted_fields={"modelo": "Onix", "ano": 2024},
intents={},
turn_decision={"intent": "review_schedule", "domain": "review", "action": "answer_user"},
)
response = await flow._try_collect_and_schedule_review(
message="20000 km, nunca fiz revisao na concessionaria",
user_id=21,
extracted_fields={},
intents={},
turn_decision={"intent": "review_schedule", "domain": "review", "action": "answer_user"},
)
self.assertEqual(registry.calls[0][0], "agendar_revisao")
_, arguments, tool_user_id = registry.calls[0]
self.assertEqual(tool_user_id, 21)
self.assertEqual(arguments.get("placa"), "A0T1C23")
self.assertEqual(arguments.get("data_hora"), "14/03/2026 18:00")
self.assertEqual(arguments.get("modelo"), "Onix")
self.assertEqual(arguments.get("ano"), 2024)
self.assertEqual(arguments.get("km"), 20000)
self.assertFalse(arguments.get("revisao_previa_concessionaria"))
self.assertIn("REV-TESTE-123", response)
async def test_review_flow_offers_reuse_of_last_vehicle_package(self):
state = FakeState(
entries={
@ -1353,6 +1764,131 @@ class ReviewFlowDraftTests(unittest.IsolatedAsyncioTestCase):
self.assertEqual(draft["payload"], {})
self.assertIn("a placa do veiculo", response)
async def test_review_flow_offers_reuse_again_on_next_new_schedule_after_previous_rejection(self):
state = FakeState(
entries={
"last_review_packages": {
21: {
"payload": {
"placa": "A0T1C23",
"modelo": "Onix",
"ano": 2024,
"km": 20000,
"revisao_previa_concessionaria": False,
},
"expires_at": datetime.utcnow() + timedelta(minutes=30),
}
}
}
)
registry = FakeRegistry()
flow = ReviewFlowHarness(state=state, registry=registry)
first_response = await flow._try_collect_and_schedule_review(
message="quero agendar uma revisao",
user_id=21,
extracted_fields={},
intents={},
turn_decision={"intent": "review_schedule", "domain": "review", "action": "collect_review_schedule"},
)
self.assertIn("Posso reutilizar os dados do ultimo veiculo", first_response)
second_response = await flow._try_collect_and_schedule_review(
message="nao",
user_id=21,
extracted_fields={},
intents={},
turn_decision={"intent": "review_schedule", "domain": "review", "action": "collect_review_schedule"},
)
self.assertIn("a placa do veiculo", second_response)
self.assertIsNotNone(state.get_entry("pending_review_drafts", 21))
state.pop_entry("pending_review_drafts", 21)
third_response = await flow._try_collect_and_schedule_review(
message="quero agendar uma revisao",
user_id=21,
extracted_fields={},
intents={},
turn_decision={"intent": "review_schedule", "domain": "review", "action": "collect_review_schedule"},
)
self.assertIn("Posso reutilizar os dados do ultimo veiculo", third_response)
self.assertIsNotNone(state.get_entry("pending_review_reuse_confirmations", 21))
async def test_review_flow_offers_reuse_on_active_domain_fallback_without_explicit_llm_intent(self):
state = FakeState(
entries={
"last_review_packages": {
21: {
"payload": {
"placa": "ABC1C23",
"modelo": "Onix",
"ano": 2024,
"km": 20000,
"revisao_previa_concessionaria": False,
},
"expires_at": datetime.utcnow() + timedelta(minutes=30),
}
}
},
contexts={
21: {
"active_domain": "review",
"generic_memory": {"placa": "ABC1C23"},
"shared_memory": {"placa": "ABC1C23"},
"last_stock_results": [],
"selected_vehicle": None,
}
},
)
registry = FakeRegistry()
flow = ReviewFlowHarness(state=state, registry=registry)
response = await flow._try_collect_and_schedule_review(
message="quero agendar uma revisao",
user_id=21,
extracted_fields={},
intents={},
turn_decision={"intent": "general", "domain": "general", "action": "answer_user"},
)
self.assertIn("Posso reutilizar os dados do ultimo veiculo", response)
self.assertIsNotNone(state.get_entry("pending_review_reuse_confirmations", 21))
async def test_review_flow_explicit_reuse_request_opens_reuse_confirmation_even_without_pending_prompt(self):
state = FakeState(
entries={
"last_review_packages": {
21: {
"payload": {
"placa": "A0T1C23",
"modelo": "Onix",
"ano": 2024,
"km": 20000,
"revisao_previa_concessionaria": False,
},
"expires_at": datetime.utcnow() + timedelta(minutes=30),
}
}
}
)
registry = FakeRegistry()
flow = ReviewFlowHarness(state=state, registry=registry)
response = await flow._try_collect_and_schedule_review(
message="eu gostaria de reaproveitar as informacoes do ultimo carro",
user_id=21,
extracted_fields={},
intents={},
turn_decision={"intent": "general", "domain": "general", "action": "answer_user"},
)
self.assertIn("Posso reutilizar os dados do ultimo veiculo", response)
self.assertIsNotNone(state.get_entry("pending_review_reuse_confirmations", 21))
async def test_review_flow_reuses_vehicle_with_date_only_and_requests_missing_time(self):
state = FakeState(
entries={
@ -1730,6 +2266,98 @@ class ReviewFlowDraftTests(unittest.IsolatedAsyncioTestCase):
class ContextSwitchPolicyTests(unittest.TestCase):
def test_handle_context_switch_ignores_ambiguous_confirmation_while_review_reuse_is_pending(self):
state = FakeState(
entries={
"pending_review_reuse_confirmations": {
9: {
"payload": {
"placa": "ABC1463",
"modelo": "Civic",
"ano": 2024,
"km": 30000,
"revisao_previa_concessionaria": False,
},
"expires_at": datetime.utcnow() + timedelta(minutes=15),
}
}
},
contexts={
9: {
"pending_switch": None,
"active_domain": "review",
"generic_memory": {},
"pending_order_selection": None,
}
},
)
service = FakeService(state)
policy = ConversationPolicy(service=service)
response = policy.handle_context_switch(
message="sim",
user_id=9,
target_domain_hint="sales",
turn_decision={"domain": "sales", "intent": "order_create", "action": "answer_user"},
)
self.assertIsNone(response)
self.assertIsNone(service._get_user_context(9).get("pending_switch"))
def test_handle_context_switch_still_confirms_explicit_domain_change_with_open_review_flow(self):
state = FakeState(
entries={
"pending_review_reuse_confirmations": {
9: {
"payload": {
"placa": "ABC1463",
"modelo": "Civic",
"ano": 2024,
"km": 30000,
"revisao_previa_concessionaria": False,
},
"expires_at": datetime.utcnow() + timedelta(minutes=15),
}
}
},
contexts={
9: {
"pending_switch": None,
"active_domain": "review",
"generic_memory": {},
"pending_order_selection": None,
}
},
)
service = FakeService(state)
policy = ConversationPolicy(service=service)
response = policy.handle_context_switch(
message="agora quero comprar um carro",
user_id=9,
target_domain_hint="sales",
turn_decision={"domain": "sales", "intent": "order_create", "action": "answer_user"},
)
self.assertEqual(
response,
"Entendi que voce quer sair de agendamento de revisao e ir para compra de veiculo. Tem certeza?",
)
self.assertIsNotNone(service._get_user_context(9).get("pending_switch"))
def test_render_context_switched_message_guides_next_step_for_sales(self):
state = FakeState()
service = FakeService(state)
policy = ConversationPolicy(service=service)
response = policy.render_context_switched_message("sales")
self.assertEqual(
response,
"Certo, contexto anterior encerrado. Vamos seguir com compra de veiculo.\n"
"Pode me dizer a faixa de preco, o modelo ou o tipo de carro que voce procura.",
)
def test_handle_context_switch_drops_stale_pending_switch_when_user_starts_other_domain(self):
state = FakeState(
contexts={

@ -276,6 +276,35 @@ class TurnDecisionContractTests(unittest.IsolatedAsyncioTestCase):
self.assertEqual(decision["tool_arguments"]["numero_pedido"], "PED-20260310113756-DC1540")
self.assertEqual(decision["tool_arguments"]["motivo"], "desisti da compra")
def test_coerce_turn_decision_normalizes_order_tool_name_alias_and_downgrades_incomplete_call(self):
normalizer = EntityNormalizer()
decision = normalizer.coerce_turn_decision(
{
"intent": "order_create",
"domain": "sales",
"action": "call_tool",
"tool_name": "fazer_pedido",
"tool_arguments": {
"modelo": "Onix 2024",
},
"entities": {
"generic_memory": {},
"review_fields": {},
"review_management_fields": {},
"order_fields": {},
"cancel_order_fields": {},
},
"missing_fields": [],
"response_to_user": None,
}
)
self.assertEqual(decision["action"], "collect_order_create")
self.assertIsNone(decision["tool_name"])
self.assertEqual(decision["tool_arguments"], {})
self.assertEqual(decision["entities"]["order_fields"]["modelo_veiculo"], "Onix 2024")
def test_coerce_turn_decision_normalizes_review_tool_name_alias(self):
normalizer = EntityNormalizer()
@ -570,6 +599,88 @@ class TurnDecisionContractTests(unittest.IsolatedAsyncioTestCase):
self.assertEqual(merged["generic_memory"]["orcamento_max"], 70000)
self.assertEqual(merged["order_fields"]["cpf"], "12345678909")
def test_capture_successful_review_tool_side_effects_store_last_review_package_for_direct_tool_call(self):
state = FakeState(
contexts={
7: {
"active_domain": "review",
"generic_memory": {},
"shared_memory": {},
"order_queue": [],
"pending_order_selection": None,
"pending_switch": None,
"last_stock_results": [],
"selected_vehicle": None,
}
}
)
service = OrquestradorService.__new__(OrquestradorService)
service.state = state
service.normalizer = EntityNormalizer()
service._get_user_context = lambda user_id: state.get_user_context(user_id)
service._save_user_context = lambda user_id, context: state.save_user_context(user_id, context)
service._capture_successful_tool_side_effects(
tool_name="agendar_revisao",
arguments={
"placa": "ABC1463",
"data_hora": "28/04/2026 15:00",
"modelo": "Civic",
"ano": 2024,
"km": 30000,
"revisao_previa_concessionaria": True,
},
tool_result={
"protocolo": "REV-TESTE-123",
"placa": "ABC1463",
"data_hora": "28/04/2026 15:00",
},
user_id=7,
)
cached = state.get_entry("last_review_packages", 7)
self.assertIsNotNone(cached)
self.assertEqual(cached["payload"]["placa"], "ABC1463")
self.assertEqual(cached["payload"]["modelo"], "Civic")
self.assertTrue(cached["payload"]["revisao_previa_concessionaria"])
def test_capture_tool_result_context_stores_pending_stock_selection_entry(self):
state = FakeState(
contexts={
5: {
"active_domain": "sales",
"generic_memory": {},
"shared_memory": {},
"order_queue": [],
"pending_order_selection": None,
"pending_switch": None,
"last_stock_results": [],
"selected_vehicle": None,
}
}
)
service = OrquestradorService.__new__(OrquestradorService)
service.state = state
service.normalizer = EntityNormalizer()
service._get_user_context = lambda user_id: state.get_user_context(user_id)
service._save_user_context = lambda user_id, context: state.save_user_context(user_id, context)
service._capture_tool_result_context(
tool_name="consultar_estoque",
tool_result=[
{"id": 15, "modelo": "Volkswagen T-Cross 2022", "categoria": "suv", "preco": 73224.0, "budget_relaxed": True},
{"id": 11, "modelo": "Toyota Corolla 2024", "categoria": "suv", "preco": 76087.0},
],
user_id=5,
)
cached = state.get_entry("pending_stock_selections", 5)
self.assertIsNotNone(cached)
self.assertEqual(cached["payload"][0]["id"], 15)
self.assertEqual(cached["payload"][1]["modelo"], "Toyota Corolla 2024")
self.assertTrue(cached["payload"][0]["budget_relaxed"])
self.assertFalse(cached["payload"][1]["budget_relaxed"])
def test_entity_merge_can_enrich_message_plan_with_full_extraction(self):
service = OrquestradorService.__new__(OrquestradorService)
service.normalizer = EntityNormalizer()
@ -747,6 +858,7 @@ class TurnDecisionContractTests(unittest.IsolatedAsyncioTestCase):
service = OrquestradorService.__new__(OrquestradorService)
service.state = state
service.normalizer = EntityNormalizer()
service.policy = ConversationPolicy(service=service)
service._empty_extraction_payload = service.normalizer.empty_extraction_payload
service._log_turn_event = lambda *args, **kwargs: None
service._compose_order_aware_response = lambda response, user_id, queue_notice=None: response
@ -955,6 +1067,7 @@ class TurnDecisionContractTests(unittest.IsolatedAsyncioTestCase):
service = OrquestradorService.__new__(OrquestradorService)
service.state = state
service.normalizer = EntityNormalizer()
service.policy = ConversationPolicy(service=service)
service._empty_extraction_payload = service.normalizer.empty_extraction_payload
service._log_turn_event = lambda *args, **kwargs: None
service._compose_order_aware_response = lambda response, user_id, queue_notice=None: response
@ -1080,92 +1193,701 @@ class TurnDecisionContractTests(unittest.IsolatedAsyncioTestCase):
self.assertIn("o modelo do veiculo", response)
def test_should_prioritize_order_flow_when_cancel_draft_is_open(self):
async def test_handle_message_skips_inventory_tool_and_uses_order_flow_for_list_selection_follow_up(self):
state = FakeState(
entries={
"pending_cancel_order_drafts": {
contexts={
1: {
"payload": {"numero_pedido": "PED-202603101204814-6ED33A"},
"expires_at": datetime.utcnow() + timedelta(minutes=15),
}
"active_domain": "sales",
"generic_memory": {"orcamento_max": 70000, "perfil_veiculo": ["suv"]},
"shared_memory": {"orcamento_max": 70000, "perfil_veiculo": ["suv"]},
"order_queue": [],
"pending_order_selection": None,
"pending_switch": None,
"last_stock_results": [
{"id": 15, "modelo": "Volkswagen T-Cross 2022", "categoria": "suv", "preco": 73224.0},
{"id": 11, "modelo": "Toyota Corolla 2024", "categoria": "suv", "preco": 76087.0},
],
"selected_vehicle": None,
}
}
)
service = OrquestradorService.__new__(OrquestradorService)
service.state = state
service.normalizer = EntityNormalizer()
service.policy = ConversationPolicy(service=service)
service._empty_extraction_payload = service.normalizer.empty_extraction_payload
service._log_turn_event = lambda *args, **kwargs: None
service._compose_order_aware_response = lambda response, user_id, queue_notice=None: response
service._get_user_context = lambda user_id: state.get_user_context(user_id)
service._save_user_context = lambda user_id, context: state.save_user_context(user_id, context)
prioritized = service._should_prioritize_order_flow(
turn_decision={"intent": "general", "domain": "general", "action": "answer_user"},
extracted_entities={
async def fake_maybe_auto_advance_next_order(base_response: str, user_id: int | None):
return base_response
service._maybe_auto_advance_next_order = fake_maybe_auto_advance_next_order
service._upsert_user_context = lambda user_id: None
async def fake_extract_turn_decision(message: str, user_id: int | None):
return {
"intent": "inventory_search",
"domain": "sales",
"action": "call_tool",
"entities": {
"generic_memory": {},
"review_fields": {},
"review_management_fields": {},
"order_fields": {},
"cancel_order_fields": {},
"intents": {},
},
user_id=1,
)
"missing_fields": [],
"selection_index": None,
"tool_name": "consultar_estoque",
"tool_arguments": {"preco_max": 70000, "categoria": "suv"},
"response_to_user": "",
}
self.assertTrue(prioritized)
service._extract_turn_decision_with_llm = fake_extract_turn_decision
async def test_pending_order_selection_prefers_turn_decision_domain(self):
state = FakeState(
contexts={
9: {
"pending_order_selection": {
async def fake_try_handle_immediate_context_reset(**kwargs):
return None
service._try_handle_immediate_context_reset = fake_try_handle_immediate_context_reset
async def fake_try_resolve_pending_order_selection(**kwargs):
return None
service._try_resolve_pending_order_selection = fake_try_resolve_pending_order_selection
async def fake_try_continue_queued_order(**kwargs):
return None
service._try_continue_queued_order = fake_try_continue_queued_order
async def fake_extract_message_plan(message: str, user_id: int | None):
return {
"orders": [
{"domain": "review", "message": "agendar revisao", "memory_seed": {}},
{"domain": "sales", "message": "fazer pedido", "memory_seed": {}},
],
"expires_at": datetime.utcnow() + timedelta(minutes=15),
},
"order_queue": [],
"active_domain": "general",
"generic_memory": {},
{
"domain": "sales",
"message": message,
"entities": service.normalizer.empty_extraction_payload(),
}
]
}
)
policy = ConversationPolicy(service=FakePolicyService(state))
response = await policy.try_resolve_pending_order_selection(
message="quero comprar",
user_id=9,
turn_decision={"domain": "sales", "intent": "order_create", "action": "collect_order_create"},
service._extract_message_plan_with_llm = fake_extract_message_plan
service._prepare_message_for_single_order = lambda message, user_id, routing_plan=None: (message, None, None)
service._resolve_entities_for_message_plan = lambda message_plan, routed_message: service.normalizer.empty_extraction_payload()
async def fake_extract_entities(message: str, user_id: int | None):
return {
"generic_memory": {},
"review_fields": {},
"review_management_fields": {},
"order_fields": {},
"cancel_order_fields": {},
"intents": {},
}
service._extract_entities_with_llm = fake_extract_entities
async def fake_extract_missing_sales_search_context_with_llm(**kwargs):
return {}
service._extract_missing_sales_search_context_with_llm = fake_extract_missing_sales_search_context_with_llm
service._domain_from_intents = lambda intents: "general"
service._handle_context_switch = lambda **kwargs: None
service._update_active_domain = lambda **kwargs: None
async def fake_try_execute_orchestration_control_tool(**kwargs):
return None
service._try_execute_orchestration_control_tool = fake_try_execute_orchestration_control_tool
async def fake_try_execute_business_tool_from_turn_decision(**kwargs):
return "nao deveria chamar tool"
service._try_execute_business_tool_from_turn_decision = fake_try_execute_business_tool_from_turn_decision
async def fake_try_handle_review_management(**kwargs):
return None
service._try_handle_review_management = fake_try_handle_review_management
async def fake_try_confirm_pending_review(**kwargs):
return None
service._try_confirm_pending_review = fake_try_confirm_pending_review
async def fake_try_collect_and_schedule_review(**kwargs):
return None
service._try_collect_and_schedule_review = fake_try_collect_and_schedule_review
async def fake_try_collect_and_cancel_order(**kwargs):
return None
service._try_collect_and_cancel_order = fake_try_collect_and_cancel_order
async def fake_try_handle_order_listing(**kwargs):
return None
service._try_handle_order_listing = fake_try_handle_order_listing
async def fake_try_collect_and_create_order(**kwargs):
return "Para realizar o pedido, preciso dos dados abaixo:\n- o CPF do cliente"
service._try_collect_and_create_order = fake_try_collect_and_create_order
response = await service.handle_message(
"quero a opcao 1",
user_id=1,
)
self.assertIn("Vou comecar por: Venda: fazer pedido", response)
self.assertIn("CPF do cliente", response)
async def test_pending_order_selection_prefers_turn_decision_selection_index(self):
async def test_handle_message_short_circuits_llm_when_pending_stock_selection_matches_list_choice(self):
state = FakeState(
contexts={
9: {
"pending_order_selection": {
"orders": [
{"domain": "review", "message": "agendar revisao", "memory_seed": {}},
{"domain": "sales", "message": "fazer pedido", "memory_seed": {}},
entries={
"pending_stock_selections": {
1: {
"payload": [
{"id": 15, "modelo": "Volkswagen T-Cross 2022", "categoria": "suv", "preco": 73224.0},
{"id": 11, "modelo": "Toyota Corolla 2024", "categoria": "suv", "preco": 76087.0},
],
"expires_at": datetime.utcnow() + timedelta(minutes=15),
}
}
},
contexts={
1: {
"active_domain": "sales",
"generic_memory": {"orcamento_max": 70000, "perfil_veiculo": ["suv"]},
"shared_memory": {"orcamento_max": 70000, "perfil_veiculo": ["suv"]},
"order_queue": [],
"active_domain": "general",
"generic_memory": {},
"pending_order_selection": None,
"pending_switch": None,
"last_stock_results": [],
"selected_vehicle": None,
}
}
)
policy = ConversationPolicy(service=FakePolicyService(state))
service = OrquestradorService.__new__(OrquestradorService)
service.state = state
service.normalizer = EntityNormalizer()
service.policy = ConversationPolicy(service=service)
service._empty_extraction_payload = service.normalizer.empty_extraction_payload
service._log_turn_event = lambda *args, **kwargs: None
service._compose_order_aware_response = lambda response, user_id, queue_notice=None: response
service._get_user_context = lambda user_id: state.get_user_context(user_id)
service._save_user_context = lambda user_id, context: state.save_user_context(user_id, context)
response = await policy.try_resolve_pending_order_selection(
message="esse",
user_id=9,
turn_decision={"domain": "general", "intent": "general", "action": "answer_user", "selection_index": 1},
)
async def fake_maybe_auto_advance_next_order(base_response: str, user_id: int | None):
return base_response
self.assertIn("Vou comecar por: Venda: fazer pedido", response)
service._maybe_auto_advance_next_order = fake_maybe_auto_advance_next_order
service._upsert_user_context = lambda user_id: None
async def test_try_continue_queue_prefers_turn_decision_action(self):
state = FakeState(
async def fake_extract_turn_decision(message: str, user_id: int | None):
raise AssertionError("nao deveria consultar o LLM para selecao pendente de estoque")
service._extract_turn_decision_with_llm = fake_extract_turn_decision
async def fake_try_handle_immediate_context_reset(**kwargs):
return None
service._try_handle_immediate_context_reset = fake_try_handle_immediate_context_reset
async def fake_try_resolve_pending_order_selection(**kwargs):
return None
service._try_resolve_pending_order_selection = fake_try_resolve_pending_order_selection
async def fake_try_continue_queued_order(**kwargs):
return None
service._try_continue_queued_order = fake_try_continue_queued_order
async def fake_try_collect_and_create_order(**kwargs):
return "Para realizar o pedido, preciso dos dados abaixo:\n- o CPF do cliente"
service._try_collect_and_create_order = fake_try_collect_and_create_order
response = await service.handle_message(
"quero a opcao 1",
user_id=1,
)
self.assertIn("CPF do cliente", response)
async def test_handle_message_keeps_sales_flow_when_cpf_follow_up_is_misclassified_as_review(self):
state = FakeState(
entries={
"pending_order_drafts": {
1: {
"payload": {"vehicle_id": 15, "modelo_veiculo": "Volkswagen T-Cross 2022", "valor_veiculo": 73224.0},
"expires_at": datetime.utcnow() + timedelta(minutes=15),
}
}
},
contexts={
1: {
"active_domain": "sales",
"generic_memory": {"orcamento_max": 70000, "perfil_veiculo": ["suv"]},
"shared_memory": {"orcamento_max": 70000, "perfil_veiculo": ["suv"]},
"order_queue": [],
"pending_order_selection": None,
"pending_switch": None,
"last_stock_results": [
{"id": 15, "modelo": "Volkswagen T-Cross 2022", "categoria": "suv", "preco": 73224.0, "budget_relaxed": True},
],
"selected_vehicle": {"id": 15, "modelo": "Volkswagen T-Cross 2022", "categoria": "suv", "preco": 73224.0, "budget_relaxed": True},
}
}
)
service = OrquestradorService.__new__(OrquestradorService)
service.state = state
service.normalizer = EntityNormalizer()
service._empty_extraction_payload = service.normalizer.empty_extraction_payload
service._log_turn_event = lambda *args, **kwargs: None
service._compose_order_aware_response = lambda response, user_id, queue_notice=None: response
service._get_user_context = lambda user_id: state.get_user_context(user_id)
service._save_user_context = lambda user_id, context: state.save_user_context(user_id, context)
async def fake_maybe_auto_advance_next_order(base_response: str, user_id: int | None):
return base_response
service._maybe_auto_advance_next_order = fake_maybe_auto_advance_next_order
service._upsert_user_context = lambda user_id: None
async def fake_extract_turn_decision(message: str, user_id: int | None):
raise AssertionError("nao deveria consultar o LLM para continuar um fluxo de venda aberto")
service._extract_turn_decision_with_llm = fake_extract_turn_decision
async def fake_try_handle_immediate_context_reset(**kwargs):
return None
service._try_handle_immediate_context_reset = fake_try_handle_immediate_context_reset
async def fake_try_resolve_pending_order_selection(**kwargs):
return None
service._try_resolve_pending_order_selection = fake_try_resolve_pending_order_selection
async def fake_try_continue_queued_order(**kwargs):
return None
service._try_continue_queued_order = fake_try_continue_queued_order
async def fake_extract_message_plan(message: str, user_id: int | None):
return {
"orders": [
{
"domain": "sales",
"message": message,
"entities": service.normalizer.empty_extraction_payload(),
}
]
}
service._extract_message_plan_with_llm = fake_extract_message_plan
service._prepare_message_for_single_order = lambda message, user_id, routing_plan=None: (message, None, None)
service._resolve_entities_for_message_plan = lambda message_plan, routed_message: service.normalizer.empty_extraction_payload()
async def fake_extract_entities(message: str, user_id: int | None):
return {
"generic_memory": {},
"review_fields": {},
"review_management_fields": {},
"order_fields": {},
"cancel_order_fields": {},
"intents": {},
}
service._extract_entities_with_llm = fake_extract_entities
async def fake_extract_missing_sales_search_context_with_llm(**kwargs):
return {}
service._extract_missing_sales_search_context_with_llm = fake_extract_missing_sales_search_context_with_llm
service._domain_from_intents = lambda intents: "general"
service._update_active_domain = lambda **kwargs: None
async def fake_try_execute_orchestration_control_tool(**kwargs):
return None
service._try_execute_orchestration_control_tool = fake_try_execute_orchestration_control_tool
async def fake_try_execute_business_tool_from_turn_decision(**kwargs):
return "nao deveria executar tool planejada"
service._try_execute_business_tool_from_turn_decision = fake_try_execute_business_tool_from_turn_decision
async def fake_try_handle_review_management(**kwargs):
return None
service._try_handle_review_management = fake_try_handle_review_management
async def fake_try_confirm_pending_review(**kwargs):
return None
service._try_confirm_pending_review = fake_try_confirm_pending_review
async def fake_try_collect_and_schedule_review(**kwargs):
return None
service._try_collect_and_schedule_review = fake_try_collect_and_schedule_review
async def fake_try_collect_and_cancel_order(**kwargs):
return None
service._try_collect_and_cancel_order = fake_try_collect_and_cancel_order
async def fake_try_handle_order_listing(**kwargs):
return None
service._try_handle_order_listing = fake_try_handle_order_listing
async def fake_try_collect_and_create_order(**kwargs):
return "Pedido criado com sucesso."
service._try_collect_and_create_order = fake_try_collect_and_create_order
response = await service.handle_message(
"12345678909",
user_id=1,
)
self.assertEqual(response, "Pedido criado com sucesso.")
async def test_handle_message_prioritizes_immediate_reset_before_active_sales_follow_up(self):
state = FakeState(
contexts={
1: {
"active_domain": "sales",
"order_queue": [],
"pending_order_selection": None,
"pending_switch": None,
"last_stock_results": [],
"selected_vehicle": None,
}
}
)
service = OrquestradorService.__new__(OrquestradorService)
service.state = state
service.normalizer = EntityNormalizer()
service.policy = ConversationPolicy(service=service)
service._empty_extraction_payload = service.normalizer.empty_extraction_payload
service._log_turn_event = lambda *args, **kwargs: None
service._compose_order_aware_response = lambda response, user_id, queue_notice=None: response
service._get_user_context = lambda user_id: state.get_user_context(user_id)
service._save_user_context = lambda user_id, context: state.save_user_context(user_id, context)
async def fake_maybe_auto_advance_next_order(base_response: str, user_id: int | None):
return base_response
service._maybe_auto_advance_next_order = fake_maybe_auto_advance_next_order
service._upsert_user_context = lambda user_id: None
service._is_order_selection_reset_message = lambda message: True
async def fake_try_handle_immediate_context_reset(**kwargs):
return "Contexto da conversa limpo. Podemos recomecar do zero."
service._try_handle_immediate_context_reset = fake_try_handle_immediate_context_reset
async def fake_try_handle_pending_stock_selection_follow_up(**kwargs):
raise AssertionError("nao deveria entrar no follow-up de estoque antes do reset")
service._try_handle_pending_stock_selection_follow_up = fake_try_handle_pending_stock_selection_follow_up
async def fake_try_handle_active_sales_follow_up(**kwargs):
raise AssertionError("nao deveria entrar no follow-up de vendas antes do reset")
service._try_handle_active_sales_follow_up = fake_try_handle_active_sales_follow_up
response = await service.handle_message(
"esqueca tudo e vamos recomecar",
user_id=1,
)
self.assertEqual(response, "Contexto da conversa limpo. Podemos recomecar do zero.")
async def test_try_handle_immediate_context_reset_treats_vamos_recomecar_suffix_as_pure_reset(self):
state = FakeState(
contexts={
1: {
"active_domain": "sales",
"generic_memory": {"orcamento_max": 65000},
"shared_memory": {"orcamento_max": 65000},
"order_queue": [],
"pending_order_selection": None,
"pending_switch": None,
"last_stock_results": [{"id": 7, "modelo": "Fiat Argo 2020", "categoria": "suv", "preco": 61857.0}],
"selected_vehicle": {"id": 7, "modelo": "Fiat Argo 2020", "categoria": "suv", "preco": 61857.0},
}
}
)
service = OrquestradorService.__new__(OrquestradorService)
service.state = state
service.normalizer = EntityNormalizer()
service.policy = ConversationPolicy(service=service)
service._get_user_context = lambda user_id: state.get_user_context(user_id)
service._save_user_context = lambda user_id, context: state.save_user_context(user_id, context)
async def finish(response: str, queue_notice: str | None = None):
return response
async def fake_handle_message(message: str, user_id: int | None = None):
raise AssertionError("nao deveria reprocessar 'vamos recomecar' como mensagem de negocio")
service.handle_message = fake_handle_message
response = await service._try_handle_immediate_context_reset(
message="esqueca tudo e vamos recomecar",
user_id=1,
turn_decision={"action": "clear_context"},
finish=finish,
)
self.assertEqual(response, "Contexto da conversa limpo. Podemos recomecar do zero.")
self.assertEqual(state.get_user_context(1)["active_domain"], "general")
self.assertEqual(state.get_user_context(1)["generic_memory"], {})
async def test_active_sales_follow_up_allows_new_budget_search_to_reset_open_order_draft(self):
state = FakeState(
entries={
"pending_order_drafts": {
1: {
"payload": {
"cpf": "12345678909",
"vehicle_id": 15,
"modelo_veiculo": "Volkswagen T-Cross 2022",
"valor_veiculo": 73224.0,
},
"expires_at": datetime.utcnow() + timedelta(minutes=15),
}
}
},
contexts={
1: {
"active_domain": "sales",
"generic_memory": {"cpf": "12345678909", "orcamento_max": 70000, "perfil_veiculo": ["suv"]},
"shared_memory": {"cpf": "12345678909", "orcamento_max": 70000, "perfil_veiculo": ["suv"]},
"order_queue": [],
"pending_order_selection": None,
"pending_switch": None,
"last_stock_results": [
{"id": 15, "modelo": "Volkswagen T-Cross 2022", "categoria": "suv", "preco": 73224.0, "budget_relaxed": True},
],
"selected_vehicle": {"id": 15, "modelo": "Volkswagen T-Cross 2022", "categoria": "suv", "preco": 73224.0, "budget_relaxed": True},
}
}
)
service = OrquestradorService.__new__(OrquestradorService)
service.state = state
service.normalizer = EntityNormalizer()
service._get_user_context = lambda user_id: state.get_user_context(user_id)
service._save_user_context = lambda user_id, context: state.save_user_context(user_id, context)
async def fake_try_collect_and_create_order(**kwargs):
raise AssertionError("nao deveria consumir nova busca de estoque no atalho de venda ativa")
service._try_collect_and_create_order = fake_try_collect_and_create_order
async def finish(response: str, queue_notice: str | None = None):
return response
response = await service._try_handle_active_sales_follow_up(
message="agora quero comprar um carro de ate 60 mil",
user_id=1,
finish=finish,
)
self.assertIsNone(response)
self.assertIsNone(state.get_entry("pending_order_drafts", 1))
self.assertIsNone(state.get_user_context(1)["selected_vehicle"])
async def test_orchestration_control_ignores_cancel_flow_tool_for_fresh_sales_request_without_open_flow(self):
state = FakeState(
contexts={
1: {
"active_domain": "sales",
"order_queue": [],
"pending_order_selection": None,
"pending_switch": None,
"last_stock_results": [],
"selected_vehicle": None,
}
}
)
service = OrquestradorService.__new__(OrquestradorService)
service.state = state
service.normalizer = EntityNormalizer()
service.policy = ConversationPolicy(service=service)
class DummyRegistry:
def get_tools(self):
return []
class DummyExecutor:
async def execute(self, tool_name, arguments, user_id=None):
raise AssertionError("nao deveria executar cancelar_fluxo_atual para nova busca operacional")
service.registry = DummyRegistry()
service.tool_executor = DummyExecutor()
service._get_user_context = lambda user_id: state.get_user_context(user_id)
service._save_user_context = lambda user_id, context: state.save_user_context(user_id, context)
service._has_open_flow = lambda user_id, domain: service.policy.has_open_flow(user_id=user_id, domain=domain)
service._is_low_value_response = lambda text: False
async def fake_call_llm_with_trace(**kwargs):
return {
"response": "",
"tool_call": {
"name": "cancelar_fluxo_atual",
"arguments": {"motivo": "cliente mudou de ideia e iniciou nova busca"},
},
}
service._call_llm_with_trace = fake_call_llm_with_trace
async def finish(response: str, queue_notice: str | None = None):
return response
response = await service._try_execute_orchestration_control_tool(
message="agora eu quero comprar um carro de ate 70 mil",
user_id=1,
turn_decision={},
extracted_entities={},
queue_notice=None,
finish=finish,
)
self.assertIsNone(response)
def test_should_prioritize_order_flow_when_cancel_draft_is_open(self):
state = FakeState(
entries={
"pending_cancel_order_drafts": {
1: {
"payload": {"numero_pedido": "PED-202603101204814-6ED33A"},
"expires_at": datetime.utcnow() + timedelta(minutes=15),
}
}
}
)
service = OrquestradorService.__new__(OrquestradorService)
service.state = state
service.normalizer = EntityNormalizer()
prioritized = service._should_prioritize_order_flow(
turn_decision={"intent": "general", "domain": "general", "action": "answer_user"},
extracted_entities={
"generic_memory": {},
"review_fields": {},
"review_management_fields": {},
"order_fields": {},
"cancel_order_fields": {},
"intents": {},
},
user_id=1,
)
self.assertTrue(prioritized)
def test_should_prioritize_order_flow_when_message_selects_vehicle_from_last_stock_results(self):
state = FakeState(
contexts={
1: {
"active_domain": "sales",
"generic_memory": {"orcamento_max": 70000, "perfil_veiculo": ["suv"]},
"shared_memory": {"orcamento_max": 70000, "perfil_veiculo": ["suv"]},
"last_stock_results": [
{"id": 15, "modelo": "Volkswagen T-Cross 2022", "categoria": "suv", "preco": 73224.0},
{"id": 11, "modelo": "Toyota Corolla 2024", "categoria": "suv", "preco": 76087.0},
],
"selected_vehicle": None,
}
}
)
service = OrquestradorService.__new__(OrquestradorService)
service.state = state
service.normalizer = EntityNormalizer()
service._get_user_context = lambda user_id: state.get_user_context(user_id)
prioritized = service._should_prioritize_order_flow(
turn_decision={"intent": "inventory_search", "domain": "sales", "action": "call_tool"},
extracted_entities={
"generic_memory": {},
"review_fields": {},
"review_management_fields": {},
"order_fields": {},
"cancel_order_fields": {},
"intents": {},
},
user_id=1,
message="quero a opcao 1",
)
self.assertTrue(prioritized)
async def test_pending_order_selection_prefers_turn_decision_domain(self):
state = FakeState(
contexts={
9: {
"pending_order_selection": {
"orders": [
{"domain": "review", "message": "agendar revisao", "memory_seed": {}},
{"domain": "sales", "message": "fazer pedido", "memory_seed": {}},
],
"expires_at": datetime.utcnow() + timedelta(minutes=15),
},
"order_queue": [],
"active_domain": "general",
"generic_memory": {},
}
}
)
policy = ConversationPolicy(service=FakePolicyService(state))
response = await policy.try_resolve_pending_order_selection(
message="quero comprar",
user_id=9,
turn_decision={"domain": "sales", "intent": "order_create", "action": "collect_order_create"},
)
self.assertIn("Vou comecar por: Venda: fazer pedido", response)
async def test_pending_order_selection_prefers_turn_decision_selection_index(self):
state = FakeState(
contexts={
9: {
"pending_order_selection": {
"orders": [
{"domain": "review", "message": "agendar revisao", "memory_seed": {}},
{"domain": "sales", "message": "fazer pedido", "memory_seed": {}},
],
"expires_at": datetime.utcnow() + timedelta(minutes=15),
},
"order_queue": [],
"active_domain": "general",
"generic_memory": {},
}
}
)
policy = ConversationPolicy(service=FakePolicyService(state))
response = await policy.try_resolve_pending_order_selection(
message="esse",
user_id=9,
turn_decision={"domain": "general", "intent": "general", "action": "answer_user", "selection_index": 1},
)
self.assertIn("Vou comecar por: Venda: fazer pedido", response)
async def test_try_continue_queue_prefers_turn_decision_action(self):
state = FakeState(
contexts={
9: {
"pending_switch": {
@ -1221,7 +1943,11 @@ class TurnDecisionContractTests(unittest.IsolatedAsyncioTestCase):
turn_decision={"domain": "review", "intent": "review_schedule", "action": "collect_review_schedule"},
)
self.assertEqual(response, "Certo, contexto anterior encerrado. Vamos seguir com agendamento de revisao.")
self.assertEqual(
response,
"Certo, contexto anterior encerrado. Vamos seguir com agendamento de revisao.\n"
"Pode me informar a placa ou, se preferir, ja mandar placa, data/hora, modelo, ano, km e se ja fez revisao.",
)
def test_prepare_message_for_single_order_defers_explicit_domain_switch_with_open_flow(self):
state = FakeState(

Loading…
Cancel
Save