You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
orquestrador/tests/test_conversation_adjustmen...

3108 lines
127 KiB
Python

import os
import unittest
from datetime import datetime, timedelta
from app.core.time_utils import utc_now
from unittest.mock import patch
os.environ.setdefault("DEBUG", "false")
from fastapi import HTTPException
from app.services.flows.order_flow import OrderFlowMixin
from app.services.flows.rental_flow import RentalFlowMixin
from app.services.flows.review_flow import ReviewFlowMixin
from app.integrations.telegram_satellite_service import _ensure_supported_runtime_configuration, _split_telegram_text
from app.models.tool_model import ToolDefinition
from app.services.orchestration.conversation_policy import ConversationPolicy
from app.services.orchestration.entity_normalizer import EntityNormalizer
from app.services.orchestration.response_formatter import fallback_format_tool_result
from app.services.tools.tool_registry import ToolRegistry
from app.services.tools.handlers import _parse_data_hora_revisao
class FakeState:
def __init__(self, entries=None, contexts=None):
self.entries = entries or {}
self.contexts = contexts or {}
def get_entry(self, bucket: str, user_id: int | None, *, expire: bool = False):
if user_id is None:
return None
return self.entries.get(bucket, {}).get(user_id)
def set_entry(self, bucket: str, user_id: int | None, value: dict):
if user_id is None:
return
self.entries.setdefault(bucket, {})[user_id] = value
def pop_entry(self, bucket: str, user_id: int | None):
if user_id is None:
return None
return self.entries.get(bucket, {}).pop(user_id, None)
def get_user_context(self, user_id: int | None):
if user_id is None:
return None
return self.contexts.get(user_id)
def save_user_context(self, user_id: int | None, context: dict):
if user_id is None:
return
self.contexts[user_id] = context
class FakeService:
def __init__(self, state):
self.state = state
self.normalizer = EntityNormalizer()
def _is_affirmative_message(self, text: str) -> bool:
normalized = self.normalizer.normalize_text(text).strip().rstrip(".!?,;:")
return normalized in {"sim", "pode", "ok", "confirmo", "aceito", "fechado", "pode sim", "tenho", "tenho sim"}
def _get_user_context(self, user_id: int | None):
return self.state.get_user_context(user_id)
def _save_user_context(self, user_id: int | None, context: dict | None) -> None:
if user_id is None or not isinstance(context, dict):
return
self.state.save_user_context(user_id, context)
class FakeRegistry:
def __init__(self):
self.calls = []
self.raise_http_exception = None
async def execute(self, tool_name: str, arguments: dict, user_id: int | None = None):
self.calls.append((tool_name, arguments, user_id))
if self.raise_http_exception is not None:
raise self.raise_http_exception
if tool_name == "consultar_estoque":
stock_results = [
{"id": 1, "modelo": "Honda Civic 2021", "categoria": "sedan", "preco": 48500.0},
{"id": 2, "modelo": "Toyota Yaris 2020", "categoria": "hatch", "preco": 49900.0},
]
reverse = str(arguments.get("ordenar_preco") or "asc").lower() == "desc"
return sorted(stock_results, key=lambda item: float(item["preco"]), reverse=reverse)
if tool_name == "consultar_frota_aluguel":
return [
{"id": 1, "placa": "RAA1A01", "modelo": "Chevrolet Tracker", "categoria": "suv", "ano": 2024, "valor_diaria": 219.9, "status": "disponivel"},
{"id": 2, "placa": "RAA1A02", "modelo": "Fiat Pulse", "categoria": "suv", "ano": 2024, "valor_diaria": 189.9, "status": "disponivel"},
]
if tool_name == "abrir_locacao_aluguel":
return {
"contrato_numero": "LOC-TESTE-123",
"placa": arguments.get("placa") or "RAA1A01",
"modelo_veiculo": "Chevrolet Tracker",
"categoria": "suv",
"data_inicio": arguments["data_inicio"],
"data_fim_prevista": arguments["data_fim_prevista"],
"valor_diaria": 219.9,
"valor_previsto": 659.7,
"status": "ativa",
"status_veiculo": "alugado",
"cpf": arguments.get("cpf"),
}
if tool_name == "listar_pedidos":
return [
{
"numero_pedido": "PED-TESTE-001",
"modelo_veiculo": "Fiat Argo 2020",
"valor_veiculo": 61857.0,
"status": "Ativo",
},
{
"numero_pedido": "PED-TESTE-002",
"modelo_veiculo": "Toyota Corolla 2020",
"valor_veiculo": 58476.0,
"status": "Cancelado",
},
]
if tool_name == "realizar_pedido":
vehicle_map = {
1: ("Honda Civic 2021", 51524.0),
2: ("Toyota Corolla 2020", 58476.0),
3: ("Chevrolet Onix 2022", 51809.0),
7: ("Fiat Argo 2020", 61857.0),
9: ("Hyundai HB20S 2022", 76000.0),
}
modelo_veiculo, valor_veiculo = vehicle_map[arguments["vehicle_id"]]
return {
"numero_pedido": "PED-TESTE-123",
"status": "Ativo",
"modelo_veiculo": modelo_veiculo,
"valor_veiculo": valor_veiculo,
}
if tool_name == "agendar_revisao":
return {
"protocolo": "REV-TESTE-123",
"placa": arguments["placa"],
"data_hora": arguments["data_hora"],
"valor_revisao": 840.60,
}
if tool_name == "listar_agendamentos_revisao":
return [
{
"protocolo": "REV-TESTE-001",
"placa": "ABC1234",
"data_hora": "13/03/2026 16:00",
"status": "Agendado",
}
]
if tool_name == "cancelar_agendamento_revisao":
return {
"protocolo": arguments["protocolo"],
"placa": "ABC1269",
"data_hora": "13/03/2026 16:00",
"status": "Cancelado",
}
if tool_name == "editar_data_revisao":
return {
"protocolo": arguments["protocolo"],
"placa": "ABC1269",
"data_hora": arguments["nova_data_hora"],
"status": "Remarcado",
}
return {
"numero_pedido": arguments["numero_pedido"],
"status": "Cancelado",
"motivo": arguments["motivo"],
}
def coerce_http_error(self, exc):
detail = exc.detail if isinstance(exc.detail, dict) else {"message": str(exc.detail)}
return {
"code": detail.get("code", "tool_error"),
"message": detail.get("message", str(exc.detail)),
"retryable": bool(detail.get("retryable", False)),
"field": detail.get("field"),
}
class OrderFlowHarness(OrderFlowMixin):
def __init__(self, state, registry):
self.state = state
self.registry = registry
self.tool_executor = registry
self.normalizer = EntityNormalizer()
def _get_user_context(self, user_id: int | None):
return self.state.get_user_context(user_id)
def _save_user_context(self, user_id: int | None, context: dict | None) -> None:
if user_id is None or not isinstance(context, dict):
return
self.state.save_user_context(user_id, context)
def _normalize_intents(self, data) -> dict:
return self.normalizer.normalize_intents(data)
def _normalize_cancel_order_fields(self, data) -> dict:
return self.normalizer.normalize_cancel_order_fields(data)
def _normalize_order_fields(self, data) -> dict:
return self.normalizer.normalize_order_fields(data)
def _normalize_text(self, text: str) -> str:
return self.normalizer.normalize_text(text)
def _is_affirmative_message(self, text: str) -> bool:
normalized = self.normalizer.normalize_text(text).strip().rstrip(".!?,;:")
return normalized in {"sim", "pode", "ok", "confirmo", "aceito", "fechado", "pode sim", "continuar"}
def _is_negative_message(self, text: str) -> bool:
normalized = self.normalizer.normalize_text(text).strip().rstrip(".!?,;:")
return normalized in {"nao", "n\u00e3o", "negativo", "cancelar", "outro", "outra opcao", "outra op\u00e7\u00e3o"}
def _normalize_positive_number(self, value):
return self.normalizer.normalize_positive_number(value)
def _http_exception_detail(self, exc) -> str:
return str(exc)
def _fallback_format_tool_result(self, tool_name: str, tool_result) -> str:
if tool_name == "consultar_estoque":
lines = [f"Encontrei {len(tool_result)} veiculo(s):"]
for idx, item in enumerate(tool_result, start=1):
lines.append(f"{idx}. {item['modelo']} ({item['categoria']}) - R$ {item['preco']:.2f}")
lines.append("Para escolher, responda com o numero da opcao desejada. Exemplo: 1.")
return "\n".join(lines)
if tool_name == "realizar_pedido":
return (
f"Pedido criado com sucesso.\n"
f"Numero: {tool_result['numero_pedido']}\n"
f"Veiculo: {tool_result['modelo_veiculo']}\n"
f"Valor: R$ {tool_result['valor_veiculo']:.2f}"
)
if tool_name == "listar_pedidos":
lines = [f"Encontrei {len(tool_result)} pedido(s):"]
for idx, item in enumerate(tool_result, start=1):
lines.append(
f"{idx}. {item['numero_pedido']} | {item['modelo_veiculo']} | "
f"{item['status']} | R$ {item['valor_veiculo']:.2f}"
)
return "\n".join(lines)
return (
f"Pedido {tool_result['numero_pedido']} atualizado.\n"
f"Status: {tool_result['status']}\n"
f"Motivo: {tool_result['motivo']}"
)
def _try_prefill_order_cpf_from_user_profile(self, user_id: int | None, payload: dict) -> None:
return None
def _load_vehicle_by_id(self, vehicle_id: int) -> dict | None:
for context in self.state.contexts.values():
for item in context.get("last_stock_results", []):
if int(item["id"]) == int(vehicle_id):
return dict(item)
return None
class RentalFlowHarness(RentalFlowMixin):
def __init__(self, state, registry):
self.state = state
self.registry = registry
self.tool_executor = registry
self.normalizer = EntityNormalizer()
def _get_user_context(self, user_id: int | None):
return self.state.get_user_context(user_id)
def _save_user_context(self, user_id: int | None, context: dict | None) -> None:
if user_id is None or not isinstance(context, dict):
return
self.state.save_user_context(user_id, context)
def _normalize_text(self, text: str) -> str:
return self.normalizer.normalize_text(text)
def _decision_intent(self, turn_decision: dict | None) -> str:
return str((turn_decision or {}).get("intent") or "").strip().lower()
def _http_exception_detail(self, exc) -> str:
return str(exc)
def _reset_pending_rental_states(self, user_id: int | None) -> None:
if user_id is None:
return
self.state.pop_entry("pending_rental_drafts", user_id)
self.state.pop_entry("pending_rental_selections", user_id)
context = self._get_user_context(user_id)
if isinstance(context, dict):
context["last_rental_results"] = []
context["selected_rental_vehicle"] = None
if context.get("active_task") == "rental_create":
context["active_task"] = None
if str(context.get("active_domain") or "").strip().lower() == "rental":
context["active_domain"] = "general"
self._save_user_context(user_id=user_id, context=context)
def _fallback_format_tool_result(self, tool_name: str, tool_result) -> str:
if tool_name == "consultar_frota_aluguel":
lines = [f"Encontrei {len(tool_result)} veiculo(s) para locacao:"]
for idx, item in enumerate(tool_result, start=1):
lines.append(
f"{idx}. {item['modelo']} {item['ano']} | {item['placa']} | "
f"{item['categoria']} | diaria R$ {item['valor_diaria']:.2f}"
)
lines.append("Para seguir com a locacao, informe a placa ou o numero da opcao desejada.")
return "\n".join(lines)
if tool_name == "abrir_locacao_aluguel":
return (
"Locacao aberta com sucesso.\n"
f"Contrato: {tool_result['contrato_numero']}\n"
f"Placa: {tool_result['placa']}\n"
f"Inicio: {tool_result['data_inicio']}\n"
f"Devolucao prevista: {tool_result['data_fim_prevista']}"
)
return str(tool_result)
class ReviewFlowHarness(ReviewFlowMixin):
def __init__(self, state, registry, review_now_provider=None):
self.state = state
self.registry = registry
self.tool_executor = registry
self.normalizer = EntityNormalizer()
self.captured_suggestions = []
self.logged_events = []
self._review_now_provider = review_now_provider
def _normalize_intents(self, data) -> dict:
return self.normalizer.normalize_intents(data)
def _normalize_review_fields(self, data) -> dict:
return self.normalizer.normalize_review_fields(data)
def _normalize_review_management_fields(self, data) -> dict:
return self.normalizer.normalize_review_management_fields(data)
def _normalize_text(self, text: str) -> str:
return self.normalizer.normalize_text(text)
def _normalize_review_datetime_text(self, value) -> str | None:
return self.normalizer.normalize_review_datetime_text(value)
def _http_exception_detail(self, exc) -> str:
detail = exc.detail if isinstance(exc.detail, dict) else {}
return str(detail.get("message") or exc)
def _get_user_context(self, user_id: int | None):
return self.state.get_user_context(user_id)
def _save_user_context(self, user_id: int | None, context: dict | None) -> None:
if user_id is None or not isinstance(context, dict):
return
self.state.save_user_context(user_id, context)
def _fallback_format_tool_result(self, tool_name: str, tool_result) -> str:
return f"{tool_name}:{tool_result}"
def _extract_review_protocol_from_text(self, text: str) -> str | None:
return self.normalizer.extract_review_protocol_from_text(text)
def _is_affirmative_message(self, text: str) -> bool:
normalized = self.normalizer.normalize_text(text).strip().rstrip(".!?,;:")
return normalized in {"sim", "pode", "ok", "confirmo", "aceito", "fechado", "pode sim", "tenho", "tenho sim"}
def _is_negative_message(self, text: str) -> bool:
normalized = self.normalizer.normalize_text(text).strip().rstrip(".!?,;:")
return normalized in {"nao", "nao quero", "prefiro outro", "outro horario"} or normalized.startswith("nao")
def _capture_review_confirmation_suggestion(self, **kwargs) -> None:
self.captured_suggestions.append(kwargs)
def _try_prefill_review_fields_from_memory(self, user_id: int | None, payload: dict) -> None:
return None
def _log_turn_event(self, event: str, **payload) -> None:
self.logged_events.append((event, payload))
def _reset_pending_review_states(self, user_id: int | None) -> None:
self.state.pop_entry("pending_review_drafts", user_id)
self.state.pop_entry("pending_review_management_drafts", user_id)
self.state.pop_entry("pending_review_confirmations", user_id)
self.state.pop_entry("pending_review_reuse_confirmations", user_id)
class ConversationAdjustmentsTests(unittest.TestCase):
def test_telegram_satellite_requires_redis_in_production(self):
with patch("app.integrations.telegram_satellite_service.settings.environment", "production"), patch(
"app.integrations.telegram_satellite_service.settings.conversation_state_backend",
"memory",
):
with self.assertRaises(RuntimeError):
_ensure_supported_runtime_configuration()
def test_telegram_satellite_allows_redis_in_production(self):
with patch("app.integrations.telegram_satellite_service.settings.environment", "production"), patch(
"app.integrations.telegram_satellite_service.settings.conversation_state_backend",
"redis",
):
_ensure_supported_runtime_configuration()
def test_telegram_satellite_splits_long_messages_safely(self):
text = ("A" * 3900) + "\n" + ("B" * 3900) + "\n" + ("C" * 3900)
chunks = _split_telegram_text(text)
self.assertGreater(len(chunks), 1)
self.assertTrue(all(len(chunk) <= 3800 for chunk in chunks))
rebuilt = "\n".join(chunks)
self.assertEqual(rebuilt.replace("\n", ""), text.replace("\n", ""))
self.assertIn("A" * 100, rebuilt)
self.assertIn("B" * 100, rebuilt)
self.assertIn("C" * 100, rebuilt)
def test_review_listing_formatter_is_compact_and_complete(self):
response = fallback_format_tool_result(
"listar_agendamentos_revisao",
[
{
"protocolo": "REV-1",
"placa": "ABC1234",
"data_hora": "2026-03-19T09:30:00",
"status": "agendado",
},
{
"protocolo": "REV-2",
"placa": "XYZ9999",
"data_hora": "2026-03-20T10:00:00",
"status": "cancelado",
},
],
)
self.assertIn("Voce tem 2 agendamento(s):", response)
self.assertIn("1. REV-1 | ABC1234 |", response)
self.assertIn("2. REV-2 | XYZ9999 |", response)
self.assertNotIn("\n\n", response)
def test_defer_flow_cancel_when_order_cancel_draft_waits_for_reason(self):
state = FakeState(
entries={
"pending_cancel_order_drafts": {
7: {
"payload": {"numero_pedido": "PED-20260305120000-ABC123"},
"expires_at": utc_now() + timedelta(minutes=30),
}
}
}
)
policy = ConversationPolicy(service=FakeService(state))
self.assertTrue(policy.should_defer_flow_cancellation_control("desisti", user_id=7))
self.assertFalse(policy.should_defer_flow_cancellation_control("cancelar fluxo atual", user_id=7))
def test_defer_flow_cancel_when_review_reuse_confirmation_is_pending(self):
state = FakeState(
entries={
"pending_review_reuse_confirmations": {
7: {
"payload": {
"placa": "ABC1234",
"modelo": "Corolla",
"ano": 2020,
"km": 30000,
"revisao_previa_concessionaria": True,
},
"expires_at": utc_now() + timedelta(minutes=30),
}
}
}
)
policy = ConversationPolicy(service=FakeService(state))
self.assertTrue(policy.should_defer_flow_cancellation_control("nao", user_id=7))
self.assertFalse(policy.should_defer_flow_cancellation_control("cancelar fluxo atual", user_id=7))
def test_defer_flow_cancel_for_fresh_sales_request_without_open_flow(self):
state = FakeState(
contexts={
7: {
"active_domain": "sales",
"pending_order_selection": None,
"pending_switch": None,
}
}
)
policy = ConversationPolicy(service=FakeService(state))
self.assertTrue(policy.should_defer_flow_cancellation_control("agora eu quero comprar um carro de ate 70 mil", user_id=7))
self.assertFalse(policy.should_defer_flow_cancellation_control("cancelar fluxo atual", user_id=7))
def test_normalize_datetime_connector_accepts_as_com_acento(self):
normalizer = EntityNormalizer()
self.assertEqual(
normalizer.normalize_datetime_connector("10/03/2026 \u00e0s 09:00"),
"10/03/2026 09:00",
)
def test_parse_review_datetime_accepts_as_com_acento(self):
parsed = _parse_data_hora_revisao("10/03/2026 \u00e0s 09:00")
self.assertEqual(parsed, datetime(2026, 3, 10, 9, 0))
def test_normalize_review_datetime_extracts_datetime_from_long_review_sentence(self):
normalizer = EntityNormalizer()
self.assertEqual(
normalizer.normalize_review_datetime_text(
"para ABC1234 em 28/03/2026 as 8:00, Corolla, 2020, 30000 km, ja fiz revisao"
),
"28/03/2026 08:00",
)
def test_normalize_review_fields_discards_invalid_datetime_noise(self):
normalizer = EntityNormalizer()
self.assertEqual(
normalizer.normalize_review_fields({"data_hora": "quero agendar uma revisao qualquer"}),
{},
)
def test_reset_message_variants_strip_previous_context_prefix(self):
state = FakeState()
policy = ConversationPolicy(service=FakeService(state))
message = "Esque\u00e7a as opera\u00e7\u00f5es anteriores, agora quero agendar revis\u00e3o para ABC1234"
cleaned = policy.remove_order_selection_reset_prefix(message)
self.assertTrue(policy.is_order_selection_reset_message(message))
self.assertEqual(cleaned, "quero agendar revis\u00e3o para ABC1234")
class CancelOrderFlowTests(unittest.IsolatedAsyncioTestCase):
async def test_cancel_order_flow_accepts_turn_decision_without_legacy_intents(self):
state = FakeState()
registry = FakeRegistry()
flow = OrderFlowHarness(state=state, registry=registry)
response = await flow._try_collect_and_cancel_order(
message="cancelar pedido PED-20260305120000-ABC123",
user_id=42,
extracted_fields={"numero_pedido": "PED-20260305120000-ABC123"},
intents={},
turn_decision={"intent": "order_cancel", "domain": "sales", "action": "collect_order_cancel"},
)
self.assertEqual(response, "Encontrei o pedido informado. Qual o motivo do cancelamento?")
self.assertIsNotNone(state.get_entry("pending_cancel_order_drafts", 42))
async def test_cancel_order_flow_consumes_free_text_reason(self):
state = FakeState(
entries={
"pending_cancel_order_drafts": {
42: {
"payload": {"numero_pedido": "PED-20260305120000-ABC123"},
"expires_at": utc_now() + timedelta(minutes=30),
}
}
}
)
registry = FakeRegistry()
flow = OrderFlowHarness(state=state, registry=registry)
response = await flow._try_collect_and_cancel_order(
message="desisti",
user_id=42,
extracted_fields={},
intents={},
)
self.assertEqual(len(registry.calls), 1)
tool_name, arguments, tool_user_id = registry.calls[0]
self.assertEqual(tool_name, "cancelar_pedido")
self.assertEqual(tool_user_id, 42)
self.assertEqual(arguments["numero_pedido"], "PED-20260305120000-ABC123")
self.assertEqual(arguments["motivo"], "desisti")
self.assertIn("Status: Cancelado", response)
self.assertIsNone(state.get_entry("pending_cancel_order_drafts", 42))
async def test_cancel_order_flow_consumes_free_text_reason_even_when_model_repeats_order_cancel_intent(self):
state = FakeState(
entries={
"pending_cancel_order_drafts": {
42: {
"payload": {"numero_pedido": "PED-20260305120000-ABC123"},
"expires_at": utc_now() + timedelta(minutes=30),
}
}
}
)
registry = FakeRegistry()
flow = OrderFlowHarness(state=state, registry=registry)
response = await flow._try_collect_and_cancel_order(
message="Eu desisti dessa compra",
user_id=42,
extracted_fields={},
intents={},
turn_decision={"intent": "order_cancel", "domain": "sales", "action": "answer_user"},
)
self.assertEqual(len(registry.calls), 1)
tool_name, arguments, tool_user_id = registry.calls[0]
self.assertEqual(tool_name, "cancelar_pedido")
self.assertEqual(tool_user_id, 42)
self.assertEqual(arguments["numero_pedido"], "PED-20260305120000-ABC123")
self.assertEqual(arguments["motivo"], "Eu desisti dessa compra")
self.assertIn("Pedido PED-20260305120000-ABC123 atualizado.", response)
self.assertIn("Status: Cancelado", response)
self.assertIsNone(state.get_entry("pending_cancel_order_drafts", 42))
async def test_cancel_order_flow_still_requests_reason_when_message_is_too_short(self):
state = FakeState(
entries={
"pending_cancel_order_drafts": {
42: {
"payload": {"numero_pedido": "PED-20260305120000-ABC123"},
"expires_at": utc_now() + timedelta(minutes=30),
}
}
}
)
registry = FakeRegistry()
flow = OrderFlowHarness(state=state, registry=registry)
response = await flow._try_collect_and_cancel_order(
message="ok",
user_id=42,
extracted_fields={},
intents={},
)
self.assertEqual(registry.calls, [])
self.assertIn("o motivo do cancelamento", response)
self.assertIsNotNone(state.get_entry("pending_cancel_order_drafts", 42))
async def test_cancel_order_flow_does_not_override_active_order_creation_draft(self):
state = FakeState(
entries={
"pending_order_drafts": {
42: {
"payload": {},
"expires_at": utc_now() + timedelta(minutes=30),
}
}
}
)
registry = FakeRegistry()
flow = OrderFlowHarness(state=state, registry=registry)
response = await flow._try_collect_and_cancel_order(
message="2",
user_id=42,
extracted_fields={},
intents={"order_cancel": True},
)
self.assertIsNone(response)
self.assertEqual(registry.calls, [])
async def test_cancel_order_flow_restores_draft_from_context_snapshot_when_bucket_is_missing(self):
state = FakeState(
contexts={
42: {
"active_domain": "sales",
"active_task": "order_cancel",
"generic_memory": {},
"shared_memory": {},
"collected_slots": {
"order_cancel": {"numero_pedido": "PED-20260305120000-ABC123"},
},
"flow_snapshots": {
"order_cancel": {
"payload": {"numero_pedido": "PED-20260305120000-ABC123"},
"expires_at": utc_now() + timedelta(minutes=30),
}
},
"last_stock_results": [],
"selected_vehicle": None,
}
}
)
registry = FakeRegistry()
flow = OrderFlowHarness(state=state, registry=registry)
response = await flow._try_collect_and_cancel_order(
message="desisti da compra",
user_id=42,
extracted_fields={},
intents={},
)
self.assertEqual(len(registry.calls), 1)
tool_name, arguments, tool_user_id = registry.calls[0]
self.assertEqual(tool_name, "cancelar_pedido")
self.assertEqual(tool_user_id, 42)
self.assertEqual(arguments["numero_pedido"], "PED-20260305120000-ABC123")
self.assertEqual(arguments["motivo"], "desisti da compra")
self.assertIn("Status: Cancelado", response)
class CreateOrderFlowWithVehicleTests(unittest.IsolatedAsyncioTestCase):
async def test_order_listing_preserves_open_order_draft(self):
state = FakeState(
entries={
"pending_order_drafts": {
10: {
"payload": {"cpf": "12345678909"},
"expires_at": utc_now() + timedelta(minutes=30),
}
}
}
)
registry = FakeRegistry()
flow = OrderFlowHarness(state=state, registry=registry)
response = await flow._try_handle_order_listing(
message="Liste os meus pedidos",
user_id=10,
intents={},
turn_decision={"intent": "order_list", "domain": "sales", "action": "call_tool"},
)
self.assertEqual(registry.calls[0][0], "listar_pedidos")
self.assertEqual(registry.calls[0][1]["limite"], 50)
self.assertIn("Encontrei 2 pedido(s):", response)
self.assertIsNotNone(state.get_entry("pending_order_drafts", 10))
async def test_order_listing_ignores_review_appointment_listing_message(self):
state = FakeState()
registry = FakeRegistry()
flow = OrderFlowHarness(state=state, registry=registry)
response = await flow._try_handle_order_listing(
message="liste para mim os meus agendamentos de revisao",
user_id=10,
intents={},
turn_decision={"intent": "order_list", "domain": "sales", "action": "call_tool"},
)
self.assertIsNone(response)
self.assertEqual(registry.calls, [])
async def test_order_flow_returns_clear_guidance_for_invalid_cpf_follow_up(self):
state = FakeState(
entries={
"pending_order_drafts": {
10: {
"payload": {"vehicle_id": 1, "modelo_veiculo": "Honda Civic 2021", "valor_veiculo": 48500.0},
"expires_at": utc_now() + timedelta(minutes=30),
}
}
},
contexts={
10: {
"generic_memory": {},
"shared_memory": {},
"last_stock_results": [
{"id": 1, "modelo": "Honda Civic 2021", "categoria": "sedan", "preco": 48500.0}
],
"selected_vehicle": {"id": 1, "modelo": "Honda Civic 2021", "categoria": "sedan", "preco": 48500.0},
}
}
)
registry = FakeRegistry()
flow = OrderFlowHarness(state=state, registry=registry)
response = await flow._try_collect_and_create_order(
message="123",
user_id=10,
extracted_fields={},
intents={},
turn_decision={"intent": "order_create", "domain": "sales", "action": "collect_order_create"},
)
self.assertEqual(response, "Para seguir com o pedido, preciso de um CPF valido. Pode me informar novamente?")
self.assertEqual(registry.calls, [])
self.assertIsNotNone(state.get_entry("pending_order_drafts", 10))
async def test_order_flow_rejects_cpf_linked_to_other_user_without_resetting_vehicle_choice(self):
state = FakeState(
entries={
"pending_order_drafts": {
10: {
"payload": {"vehicle_id": 1},
"expires_at": utc_now() + timedelta(minutes=30),
}
}
},
contexts={
10: {
"generic_memory": {},
"shared_memory": {},
"last_stock_results": [
{"id": 1, "modelo": "Honda Civic 2021", "categoria": "sedan", "preco": 48500.0}
],
"selected_vehicle": {"id": 1, "modelo": "Honda Civic 2021", "categoria": "sedan", "preco": 48500.0},
}
}
)
registry = FakeRegistry()
flow = OrderFlowHarness(state=state, registry=registry)
async def fake_hydrate_mock_customer_from_cpf(cpf: str, user_id: int | None = None):
raise ValueError("cpf_already_linked")
with patch(
"app.services.flows.order_flow.hydrate_mock_customer_from_cpf",
new=fake_hydrate_mock_customer_from_cpf,
):
response = await flow._try_collect_and_create_order(
message="12345678909",
user_id=10,
extracted_fields={},
intents={},
turn_decision={"intent": "order_create", "domain": "sales", "action": "collect_order_create"},
)
self.assertEqual(response, "Este CPF ja esta vinculado a outro usuario. Pode me informar um CPF diferente?")
self.assertEqual(registry.calls, [])
self.assertIsNotNone(state.get_entry("pending_order_drafts", 10))
self.assertEqual(state.get_user_context(10)["selected_vehicle"]["id"], 1)
async def test_order_flow_auto_lists_stock_on_first_purchase_message_when_budget_exists(self):
state = FakeState(
contexts={
10: {
"generic_memory": {"cpf": "12345678909", "orcamento_max": 50000},
"last_stock_results": [],
"selected_vehicle": None,
}
}
)
registry = FakeRegistry()
flow = OrderFlowHarness(state=state, registry=registry)
async def fake_hydrate_mock_customer_from_cpf(cpf: str, user_id: int | None = None):
return {"cpf": cpf, "user_id": user_id}
with patch(
"app.services.flows.order_flow.hydrate_mock_customer_from_cpf",
new=fake_hydrate_mock_customer_from_cpf,
):
response = await flow._try_collect_and_create_order(
message="Quero comprar um carro de 50 mil, meu CPF e 12345678909",
user_id=10,
extracted_fields={"cpf": "12345678909"},
intents={},
turn_decision={"intent": "order_create", "domain": "sales", "action": "collect_order_create"},
)
self.assertIn("Encontrei 2 veiculo(s):", response)
self.assertIn("Honda Civic 2021", response)
async def test_order_flow_budget_search_prioritizes_matches_closest_to_ceiling(self):
state = FakeState(
contexts={
10: {
"generic_memory": {"cpf": "12345678909", "orcamento_max": 50000},
"last_stock_results": [],
"selected_vehicle": None,
}
}
)
registry = FakeRegistry()
flow = OrderFlowHarness(state=state, registry=registry)
async def fake_hydrate_mock_customer_from_cpf(cpf: str, user_id: int | None = None):
return {"cpf": cpf, "user_id": user_id}
with patch(
"app.services.flows.order_flow.hydrate_mock_customer_from_cpf",
new=fake_hydrate_mock_customer_from_cpf,
):
response = await flow._try_collect_and_create_order(
message="Quero comprar um carro de ate 50 mil, meu CPF e 12345678909",
user_id=10,
extracted_fields={"cpf": "12345678909"},
intents={},
turn_decision={"intent": "order_create", "domain": "sales", "action": "collect_order_create"},
)
self.assertEqual(registry.calls[0][1]["ordenar_preco"], "desc")
self.assertIn("1. Toyota Yaris 2020 (hatch) - R$ 49900.00", response)
self.assertIn("2. Honda Civic 2021 (sedan) - R$ 48500.00", response)
self.assertEqual(flow._get_last_stock_results(user_id=10)[0]["modelo"], "Toyota Yaris 2020")
async def test_order_flow_budget_search_keeps_cheapest_first_when_user_asks_for_cheapest(self):
state = FakeState(
contexts={
10: {
"generic_memory": {"cpf": "12345678909", "orcamento_max": 50000},
"last_stock_results": [],
"selected_vehicle": None,
}
}
)
registry = FakeRegistry()
flow = OrderFlowHarness(state=state, registry=registry)
async def fake_hydrate_mock_customer_from_cpf(cpf: str, user_id: int | None = None):
return {"cpf": cpf, "user_id": user_id}
with patch(
"app.services.flows.order_flow.hydrate_mock_customer_from_cpf",
new=fake_hydrate_mock_customer_from_cpf,
):
response = await flow._try_collect_and_create_order(
message="Quero comprar o carro mais barato ate 50 mil, meu CPF e 12345678909",
user_id=10,
extracted_fields={"cpf": "12345678909"},
intents={},
turn_decision={"intent": "order_create", "domain": "sales", "action": "collect_order_create"},
)
self.assertEqual(registry.calls[0][1]["ordenar_preco"], "asc")
self.assertIn("1. Honda Civic 2021 (sedan) - R$ 48500.00", response)
self.assertIn("2. Toyota Yaris 2020 (hatch) - R$ 49900.00", response)
self.assertEqual(flow._get_last_stock_results(user_id=10)[0]["modelo"], "Honda Civic 2021")
async def test_order_flow_extracts_budget_from_message_when_llm_misses_it(self):
state = FakeState(
contexts={
10: {
"generic_memory": {},
"shared_memory": {},
"last_stock_results": [],
"selected_vehicle": None,
}
}
)
registry = FakeRegistry()
flow = OrderFlowHarness(state=state, registry=registry)
async def fake_hydrate_mock_customer_from_cpf(cpf: str, user_id: int | None = None):
return {"cpf": cpf, "user_id": user_id}
with patch(
"app.services.flows.order_flow.hydrate_mock_customer_from_cpf",
new=fake_hydrate_mock_customer_from_cpf,
):
response = await flow._try_collect_and_create_order(
message="Quero comprar um carro de 50 mil, meu CPF e 12345678909",
user_id=10,
extracted_fields={},
intents={},
turn_decision={"intent": "order_create", "domain": "sales", "action": "collect_order_create"},
)
self.assertIn("Encontrei 2 veiculo(s):", response)
self.assertEqual(state.get_user_context(10)["generic_memory"]["orcamento_max"], 50000)
async def test_order_flow_extracts_cpf_from_followup_message_when_llm_misses_it(self):
state = FakeState(
entries={
"pending_order_drafts": {
10: {
"payload": {},
"expires_at": utc_now() + timedelta(minutes=30),
}
}
},
contexts={
10: {
"generic_memory": {"orcamento_max": 50000},
"shared_memory": {"orcamento_max": 50000},
"last_stock_results": [],
"selected_vehicle": None,
}
},
)
registry = FakeRegistry()
flow = OrderFlowHarness(state=state, registry=registry)
async def fake_hydrate_mock_customer_from_cpf(cpf: str, user_id: int | None = None):
return {"cpf": cpf, "user_id": user_id}
with patch(
"app.services.flows.order_flow.hydrate_mock_customer_from_cpf",
new=fake_hydrate_mock_customer_from_cpf,
):
response = await flow._try_collect_and_create_order(
message="Meu CPF e 12345678909",
user_id=10,
extracted_fields={},
intents={},
)
self.assertIn("Encontrei 2 veiculo(s):", response)
self.assertEqual(registry.calls[0][0], "consultar_estoque")
async def test_order_flow_lists_stock_from_budget_when_vehicle_is_missing(self):
state = FakeState(
entries={
"pending_order_drafts": {
10: {
"payload": {"cpf": "12345678909"},
"expires_at": utc_now() + timedelta(minutes=30),
}
}
},
contexts={
10: {
"generic_memory": {"cpf": "12345678909", "orcamento_max": 50000},
"last_stock_results": [],
"selected_vehicle": None,
}
},
)
registry = FakeRegistry()
flow = OrderFlowHarness(state=state, registry=registry)
async def fake_hydrate_mock_customer_from_cpf(cpf: str, user_id: int | None = None):
return {"cpf": cpf, "user_id": user_id}
with patch(
"app.services.flows.order_flow.hydrate_mock_customer_from_cpf",
new=fake_hydrate_mock_customer_from_cpf,
):
response = await flow._try_collect_and_create_order(
message="liste os carros com esse valor em estoque",
user_id=10,
extracted_fields={},
intents={},
turn_decision={"intent": "inventory_search", "domain": "sales", "action": "call_tool"},
)
self.assertEqual(registry.calls[0][0], "consultar_estoque")
self.assertIn("Encontrei 2 veiculo(s):", response)
self.assertIn("Honda Civic 2021", response)
self.assertEqual(len(flow._get_last_stock_results(user_id=10)), 2)
async def test_order_flow_restarts_open_draft_when_user_requests_new_budget_search(self):
state = FakeState(
entries={
"pending_order_drafts": {
10: {
"payload": {
"cpf": "12345678909",
"vehicle_id": 15,
"modelo_veiculo": "Volkswagen T-Cross 2022",
"valor_veiculo": 73224.0,
},
"expires_at": utc_now() + timedelta(minutes=30),
}
}
},
contexts={
10: {
"active_domain": "sales",
"generic_memory": {"cpf": "12345678909", "orcamento_max": 70000, "perfil_veiculo": ["suv"]},
"shared_memory": {"cpf": "12345678909", "orcamento_max": 70000, "perfil_veiculo": ["suv"]},
"last_stock_results": [
{"id": 15, "modelo": "Volkswagen T-Cross 2022", "categoria": "suv", "preco": 73224.0, "budget_relaxed": True},
],
"selected_vehicle": {"id": 15, "modelo": "Volkswagen T-Cross 2022", "categoria": "suv", "preco": 73224.0, "budget_relaxed": True},
}
},
)
registry = FakeRegistry()
flow = OrderFlowHarness(state=state, registry=registry)
async def fake_hydrate_mock_customer_from_cpf(cpf: str, user_id: int | None = None):
return {"cpf": cpf, "user_id": user_id}
with patch(
"app.services.flows.order_flow.hydrate_mock_customer_from_cpf",
new=fake_hydrate_mock_customer_from_cpf,
):
response = await flow._try_collect_and_create_order(
message="agora quero comprar um carro de ate 60 mil",
user_id=10,
extracted_fields={},
intents={},
turn_decision={"intent": "order_create", "domain": "sales", "action": "collect_order_create"},
)
self.assertEqual(registry.calls[0][0], "consultar_estoque")
self.assertIn("Encontrei 2 veiculo(s):", response)
self.assertEqual(state.get_user_context(10)["generic_memory"]["orcamento_max"], 60000)
self.assertIsNone(state.get_user_context(10)["selected_vehicle"])
async def test_order_flow_accepts_turn_decision_without_legacy_intents(self):
state = FakeState(
contexts={
10: {
"generic_memory": {"cpf": "12345678909"},
"last_stock_results": [
{"id": 1, "modelo": "Honda Civic 2021", "categoria": "sedan", "preco": 51524.0},
],
"selected_vehicle": None,
}
}
)
registry = FakeRegistry()
flow = OrderFlowHarness(state=state, registry=registry)
async def fake_hydrate_mock_customer_from_cpf(cpf: str, user_id: int | None = None):
return {"cpf": cpf, "user_id": user_id}
with patch(
"app.services.flows.order_flow.hydrate_mock_customer_from_cpf",
new=fake_hydrate_mock_customer_from_cpf,
):
response = await flow._try_collect_and_create_order(
message="esse",
user_id=10,
extracted_fields={"vehicle_id": 1},
intents={},
turn_decision={"intent": "order_create", "domain": "sales", "action": "collect_order_create"},
)
self.assertEqual(len(registry.calls), 1)
tool_name, arguments, tool_user_id = registry.calls[0]
self.assertEqual(tool_name, "realizar_pedido")
self.assertEqual(tool_user_id, 10)
self.assertEqual(arguments["vehicle_id"], 1)
self.assertEqual(arguments["cpf"], "12345678909")
self.assertIn("Pedido criado com sucesso.", response)
async def test_order_flow_accepts_model_intent_without_keyword_trigger(self):
state = FakeState(
contexts={
10: {
"generic_memory": {"cpf": "12345678909"},
"last_stock_results": [
{"id": 1, "modelo": "Honda Civic 2021", "categoria": "sedan", "preco": 51524.0},
],
"selected_vehicle": None,
}
}
)
registry = FakeRegistry()
flow = OrderFlowHarness(state=state, registry=registry)
async def fake_hydrate_mock_customer_from_cpf(cpf: str, user_id: int | None = None):
return {"cpf": cpf, "user_id": user_id}
with patch(
"app.services.flows.order_flow.hydrate_mock_customer_from_cpf",
new=fake_hydrate_mock_customer_from_cpf,
):
response = await flow._try_collect_and_create_order(
message="esse",
user_id=10,
extracted_fields={"vehicle_id": 1},
intents={"order_create": True},
)
self.assertEqual(len(registry.calls), 1)
tool_name, arguments, tool_user_id = registry.calls[0]
self.assertEqual(tool_name, "realizar_pedido")
self.assertEqual(tool_user_id, 10)
self.assertEqual(arguments["vehicle_id"], 1)
self.assertEqual(arguments["cpf"], "12345678909")
self.assertIn("Pedido criado com sucesso.", response)
async def test_order_flow_requests_vehicle_selection_from_last_stock_results(self):
state = FakeState(
contexts={
10: {
"generic_memory": {},
"last_stock_results": [
{"id": 1, "modelo": "Honda Civic 2021", "categoria": "sedan", "preco": 51524.0},
{"id": 2, "modelo": "Toyota Corolla 2020", "categoria": "hatch", "preco": 58476.0},
],
"selected_vehicle": None,
}
}
)
registry = FakeRegistry()
flow = OrderFlowHarness(state=state, registry=registry)
response = await flow._try_collect_and_create_order(
message="Quero fazer um pedido",
user_id=10,
extracted_fields={},
intents={"order_create": True},
)
self.assertIn("escolha primeiro qual veiculo", response.lower())
self.assertIn("Honda Civic 2021", response)
self.assertEqual(registry.calls, [])
async def test_order_flow_single_stock_result_requires_explicit_confirmation(self):
state = FakeState(
entries={
"pending_order_drafts": {
10: {
"payload": {"cpf": "12345678909"},
"expires_at": utc_now() + timedelta(minutes=30),
}
}
},
contexts={
10: {
"generic_memory": {"cpf": "12345678909", "orcamento_max": 70000},
"shared_memory": {"orcamento_max": 70000},
"last_stock_results": [],
"selected_vehicle": None,
}
},
)
registry = FakeRegistry()
async def single_result_execute(tool_name: str, arguments: dict, user_id: int | None = None):
registry.calls.append((tool_name, arguments, user_id))
if tool_name == "consultar_estoque":
return [{"id": 7, "modelo": "Fiat Argo 2020", "categoria": "suv", "preco": 61857.0}]
if tool_name == "realizar_pedido":
return {
"numero_pedido": "PED-TESTE-123",
"status": "Ativo",
"modelo_veiculo": "Fiat Argo 2020",
"valor_veiculo": 61857.0,
}
raise AssertionError(f"Tool inesperada no teste: {tool_name}")
registry.execute = single_result_execute
flow = OrderFlowHarness(state=state, registry=registry)
async def fake_hydrate_mock_customer_from_cpf(cpf: str, user_id: int | None = None):
return {"cpf": cpf, "user_id": user_id}
with patch(
"app.services.flows.order_flow.hydrate_mock_customer_from_cpf",
new=fake_hydrate_mock_customer_from_cpf,
):
response = await flow._try_collect_and_create_order(
message="quero comprar um carro de ate 70 mil",
user_id=10,
extracted_fields={},
intents={"order_create": True},
)
self.assertIn("Encontrei 1 opcao", response)
self.assertIn("Fiat Argo 2020", response)
self.assertEqual(len(registry.calls), 1)
self.assertEqual(registry.calls[0][0], "consultar_estoque")
self.assertEqual(state.get_user_context(10)["selected_vehicle"], None)
self.assertIsNotNone(state.get_user_context(10)["pending_single_vehicle_confirmation"])
follow_up_response = await flow._try_collect_and_create_order(
message="12345678909",
user_id=10,
extracted_fields={},
intents={},
)
self.assertIn("Posso seguir com essa opcao", follow_up_response)
self.assertEqual(len(registry.calls), 1)
confirmation_response = await flow._try_collect_and_create_order(
message="sim",
user_id=10,
extracted_fields={},
intents={},
)
self.assertEqual(len(registry.calls), 2)
self.assertEqual(registry.calls[1][0], "realizar_pedido")
self.assertEqual(registry.calls[1][1]["vehicle_id"], 7)
self.assertIn("Pedido criado com sucesso.", confirmation_response)
async def test_order_flow_creates_order_with_selected_vehicle_from_list_index(self):
state = FakeState(
entries={
"pending_order_drafts": {
10: {
"payload": {"cpf": "12345678909"},
"expires_at": utc_now() + timedelta(minutes=30),
}
}
},
contexts={
10: {
"generic_memory": {"cpf": "12345678909"},
"last_stock_results": [
{"id": 1, "modelo": "Honda Civic 2021", "categoria": "sedan", "preco": 51524.0},
{"id": 2, "modelo": "Toyota Corolla 2020", "categoria": "hatch", "preco": 58476.0},
],
"selected_vehicle": None,
}
},
)
registry = FakeRegistry()
flow = OrderFlowHarness(state=state, registry=registry)
async def fake_hydrate_mock_customer_from_cpf(cpf: str, user_id: int | None = None):
return {"cpf": cpf, "user_id": user_id}
with patch(
"app.services.flows.order_flow.hydrate_mock_customer_from_cpf",
new=fake_hydrate_mock_customer_from_cpf,
):
response = await flow._try_collect_and_create_order(
message="2",
user_id=10,
extracted_fields={},
intents={},
)
self.assertEqual(len(registry.calls), 1)
tool_name, arguments, tool_user_id = registry.calls[0]
self.assertEqual(tool_name, "realizar_pedido")
self.assertEqual(tool_user_id, 10)
self.assertEqual(arguments["vehicle_id"], 2)
self.assertEqual(arguments["cpf"], "12345678909")
self.assertIn("Veiculo: Toyota Corolla 2020", response)
self.assertEqual(state.get_user_context(10).get("selected_vehicle"), None)
self.assertEqual(state.get_user_context(10).get("last_stock_results"), [])
self.assertEqual(state.get_user_context(10).get("pending_single_vehicle_confirmation"), None)
async def test_order_flow_reuses_last_stock_results_when_user_requests_order_by_model_name(self):
state = FakeState(
contexts={
10: {
"generic_memory": {},
"last_stock_results": [
{"id": 9, "modelo": "Hyundai HB20S 2022", "categoria": "sedan", "preco": 76000.0},
{"id": 2, "modelo": "Toyota Corolla 2020", "categoria": "hatch", "preco": 58476.0},
],
"selected_vehicle": None,
}
}
)
registry = FakeRegistry()
flow = OrderFlowHarness(state=state, registry=registry)
first_response = await flow._try_collect_and_create_order(
message="Quero fazer o pedido do Hyundai HB20S 2022",
user_id=10,
extracted_fields={"modelo_veiculo": "Hyundai HB20S 2022"},
intents={"order_create": True},
)
self.assertIn("cpf do cliente", first_response.lower())
self.assertEqual(registry.calls, [])
self.assertEqual(state.get_user_context(10)["selected_vehicle"]["id"], 9)
async def fake_hydrate_mock_customer_from_cpf(cpf: str, user_id: int | None = None):
return {"cpf": cpf, "user_id": user_id}
with patch(
"app.services.flows.order_flow.hydrate_mock_customer_from_cpf",
new=fake_hydrate_mock_customer_from_cpf,
):
second_response = await flow._try_collect_and_create_order(
message="12345678909",
user_id=10,
extracted_fields={},
intents={},
)
self.assertEqual(len(registry.calls), 1)
tool_name, arguments, tool_user_id = registry.calls[0]
self.assertEqual(tool_name, "realizar_pedido")
self.assertEqual(tool_user_id, 10)
self.assertEqual(arguments["vehicle_id"], 9)
self.assertEqual(arguments["cpf"], "12345678909")
self.assertIn("Veiculo: Hyundai HB20S 2022", second_response)
async def test_order_flow_accepts_partial_model_reference_from_last_stock_results(self):
state = FakeState(
contexts={
10: {
"active_domain": "sales",
"generic_memory": {"orcamento_max": 70000},
"shared_memory": {"orcamento_max": 70000},
"last_stock_results": [
{"id": 15, "modelo": "Volkswagen T-Cross 2024", "categoria": "hatch", "preco": 59306.0},
{"id": 16, "modelo": "Volkswagen T-Cross 2024", "categoria": "hatch", "preco": 59306.0},
{"id": 2, "modelo": "Toyota Corolla 2020", "categoria": "hatch", "preco": 58476.0},
],
"selected_vehicle": None,
}
}
)
registry = FakeRegistry()
flow = OrderFlowHarness(state=state, registry=registry)
response = await flow._try_collect_and_create_order(
message="gostaria de fazer o pedido do Volkswagen T-Cros",
user_id=10,
extracted_fields={},
intents={"order_create": True},
)
draft = state.get_entry("pending_order_drafts", 10)
self.assertIsNotNone(draft)
self.assertEqual(draft["payload"]["vehicle_id"], 15)
self.assertEqual(draft["payload"]["modelo_veiculo"], "Volkswagen T-Cross 2024")
self.assertEqual(state.get_user_context(10)["selected_vehicle"]["id"], 15)
self.assertIn("cpf do cliente", response.lower())
self.assertEqual(registry.calls, [])
async def test_order_flow_bootstraps_selection_from_last_stock_results_without_repeating_order_verb(self):
state = FakeState(
contexts={
10: {
"active_domain": "sales",
"generic_memory": {"orcamento_max": 70000, "perfil_veiculo": ["suv"]},
"shared_memory": {"orcamento_max": 70000, "perfil_veiculo": ["suv"]},
"last_stock_results": [
{"id": 7, "modelo": "Fiat Argo 2020", "categoria": "suv", "preco": 61857.0},
{"id": 3, "modelo": "Chevrolet Onix 2022", "categoria": "suv", "preco": 51809.0},
],
"selected_vehicle": None,
}
}
)
registry = FakeRegistry()
flow = OrderFlowHarness(state=state, registry=registry)
first_response = await flow._try_collect_and_create_order(
message="quero a opcao 1",
user_id=10,
extracted_fields={},
intents={},
)
self.assertIn("cpf do cliente", first_response.lower())
self.assertEqual(registry.calls, [])
self.assertEqual(state.get_user_context(10)["selected_vehicle"]["id"], 7)
async def fake_hydrate_mock_customer_from_cpf(cpf: str, user_id: int | None = None):
return {"cpf": cpf, "user_id": user_id}
with patch(
"app.services.flows.order_flow.hydrate_mock_customer_from_cpf",
new=fake_hydrate_mock_customer_from_cpf,
):
second_response = await flow._try_collect_and_create_order(
message="12345678909",
user_id=10,
extracted_fields={},
intents={},
)
self.assertEqual(len(registry.calls), 1)
self.assertEqual(registry.calls[0][0], "realizar_pedido")
self.assertEqual(registry.calls[0][1]["vehicle_id"], 7)
self.assertEqual(registry.calls[0][1]["cpf"], "12345678909")
self.assertIn("Veiculo: Fiat Argo 2020", second_response)
async def test_order_flow_reads_vehicle_selection_from_pending_stock_selection_repository_entry(self):
state = FakeState(
entries={
"pending_stock_selections": {
10: {
"payload": [
{"id": 7, "modelo": "Fiat Argo 2020", "categoria": "suv", "preco": 61857.0},
{"id": 3, "modelo": "Chevrolet Onix 2022", "categoria": "suv", "preco": 51809.0},
],
"expires_at": utc_now() + timedelta(minutes=30),
}
}
},
contexts={
10: {
"active_domain": "sales",
"generic_memory": {},
"shared_memory": {},
"last_stock_results": [],
"selected_vehicle": None,
}
}
)
registry = FakeRegistry()
flow = OrderFlowHarness(state=state, registry=registry)
response = await flow._try_collect_and_create_order(
message="quero a opcao 1",
user_id=10,
extracted_fields={},
intents={},
)
self.assertIn("cpf do cliente", response.lower())
self.assertEqual(registry.calls, [])
self.assertEqual(state.get_user_context(10)["selected_vehicle"]["id"], 7)
async def test_order_flow_keeps_relaxed_budget_suggestion_selected_across_follow_up(self):
state = FakeState(
entries={
"pending_stock_selections": {
10: {
"payload": [
{"id": 9, "modelo": "Hyundai HB20S 2022", "categoria": "suv", "preco": 76000.0, "budget_relaxed": True},
{"id": 2, "modelo": "Toyota Corolla 2020", "categoria": "suv", "preco": 58476.0, "budget_relaxed": True},
],
"expires_at": utc_now() + timedelta(minutes=30),
}
}
},
contexts={
10: {
"active_domain": "sales",
"generic_memory": {"orcamento_max": 70000, "perfil_veiculo": ["suv"]},
"shared_memory": {"orcamento_max": 70000, "perfil_veiculo": ["suv"]},
"last_stock_results": [],
"selected_vehicle": None,
}
}
)
registry = FakeRegistry()
flow = OrderFlowHarness(state=state, registry=registry)
first_response = await flow._try_collect_and_create_order(
message="1",
user_id=10,
extracted_fields={},
intents={},
)
self.assertIn("cpf do cliente", first_response.lower())
self.assertEqual(state.get_user_context(10)["selected_vehicle"]["id"], 9)
async def fake_hydrate_mock_customer_from_cpf(cpf: str, user_id: int | None = None):
return {"cpf": cpf, "user_id": user_id}
with patch(
"app.services.flows.order_flow.hydrate_mock_customer_from_cpf",
new=fake_hydrate_mock_customer_from_cpf,
):
second_response = await flow._try_collect_and_create_order(
message="12345678909",
user_id=10,
extracted_fields={},
intents={},
)
self.assertEqual(len(registry.calls), 1)
self.assertEqual(registry.calls[0][0], "realizar_pedido")
self.assertEqual(registry.calls[0][1]["vehicle_id"], 9)
self.assertEqual(registry.calls[0][1]["cpf"], "12345678909")
self.assertIn("Veiculo: Hyundai HB20S 2022", second_response)
async def test_order_flow_restores_draft_from_context_snapshot_when_bucket_is_missing(self):
state = FakeState(
contexts={
10: {
"active_domain": "sales",
"active_task": "order_create",
"generic_memory": {},
"shared_memory": {},
"collected_slots": {
"order_create": {"vehicle_id": 2, "modelo_veiculo": "Toyota Corolla 2020"},
},
"flow_snapshots": {
"order_create": {
"payload": {"vehicle_id": 2, "modelo_veiculo": "Toyota Corolla 2020"},
"expires_at": utc_now() + timedelta(minutes=30),
}
},
"last_stock_results": [
{"id": 2, "modelo": "Toyota Corolla 2020", "categoria": "hatch", "preco": 58476.0},
],
"selected_vehicle": {"id": 2, "modelo": "Toyota Corolla 2020", "categoria": "hatch", "preco": 58476.0},
}
}
)
registry = FakeRegistry()
flow = OrderFlowHarness(state=state, registry=registry)
async def fake_hydrate_mock_customer_from_cpf(cpf: str, user_id: int | None = None):
return {"cpf": cpf, "user_id": user_id}
with patch(
"app.services.flows.order_flow.hydrate_mock_customer_from_cpf",
new=fake_hydrate_mock_customer_from_cpf,
):
response = await flow._try_collect_and_create_order(
message="12345678909",
user_id=10,
extracted_fields={},
intents={},
)
self.assertEqual(len(registry.calls), 1)
tool_name, arguments, tool_user_id = registry.calls[0]
self.assertEqual(tool_name, "realizar_pedido")
self.assertEqual(tool_user_id, 10)
self.assertEqual(arguments["vehicle_id"], 2)
self.assertEqual(arguments["cpf"], "12345678909")
self.assertIn("Veiculo: Toyota Corolla 2020", response)
async def test_order_flow_selection_uses_list_position_not_vehicle_id(self):
state = FakeState(
entries={
"pending_order_drafts": {
10: {
"payload": {"cpf": "12345678909"},
"expires_at": utc_now() + timedelta(minutes=30),
}
}
},
contexts={
10: {
"generic_memory": {"cpf": "12345678909"},
"last_stock_results": [
{"id": 3, "modelo": "Chevrolet Onix 2022", "categoria": "suv", "preco": 51809.0},
{"id": 7, "modelo": "Fiat Argo 2020", "categoria": "suv", "preco": 61857.0},
],
"selected_vehicle": None,
}
},
)
registry = FakeRegistry()
flow = OrderFlowHarness(state=state, registry=registry)
async def fake_hydrate_mock_customer_from_cpf(cpf: str, user_id: int | None = None):
return {"cpf": cpf, "user_id": user_id}
with patch(
"app.services.flows.order_flow.hydrate_mock_customer_from_cpf",
new=fake_hydrate_mock_customer_from_cpf,
):
response = await flow._try_collect_and_create_order(
message="3",
user_id=10,
extracted_fields={},
intents={},
)
self.assertEqual(registry.calls, [])
self.assertIn("escolha primeiro qual veiculo", response.lower())
self.assertIn("1. Chevrolet Onix 2022", response)
self.assertIn("2. Fiat Argo 2020", response)
async def test_order_flow_keeps_draft_and_clears_retryable_field_on_tool_error(self):
state = FakeState(
entries={
"pending_order_drafts": {
10: {
"payload": {"cpf": "12345678909", "vehicle_id": 99},
"expires_at": utc_now() + timedelta(minutes=30),
}
}
},
contexts={
10: {
"generic_memory": {"cpf": "12345678909"},
"last_stock_results": [],
"selected_vehicle": None,
}
},
)
registry = FakeRegistry()
registry.raise_http_exception = HTTPException(
status_code=409,
detail={
"code": "vehicle_already_reserved",
"message": "Este veiculo ja esta reservado e nao aparece mais no estoque disponivel.",
"retryable": True,
"field": "vehicle_id",
},
)
flow = OrderFlowHarness(state=state, registry=registry)
async def fake_hydrate_mock_customer_from_cpf(cpf: str, user_id: int | None = None):
return {"cpf": cpf, "user_id": user_id}
with patch(
"app.services.flows.order_flow.hydrate_mock_customer_from_cpf",
new=fake_hydrate_mock_customer_from_cpf,
):
response = await flow._try_collect_and_create_order(
message="quero esse carro",
user_id=10,
extracted_fields={},
intents={},
)
draft = state.get_entry("pending_order_drafts", 10)
self.assertIn("ja esta reservado", response)
self.assertIsNotNone(draft)
self.assertEqual(draft["payload"].get("cpf"), "12345678909")
self.assertNotIn("vehicle_id", draft["payload"])
async def test_order_flow_clears_draft_after_non_retryable_credit_rejection(self):
state = FakeState(
entries={
"pending_order_drafts": {
10: {
"payload": {"cpf": "12345678909", "vehicle_id": 9, "modelo_veiculo": "Hyundai HB20S 2022"},
"expires_at": utc_now() + timedelta(minutes=30),
}
}
},
contexts={
10: {
"active_domain": "sales",
"generic_memory": {"cpf": "12345678909"},
"shared_memory": {"cpf": "12345678909"},
"last_stock_results": [
{"id": 9, "modelo": "Hyundai HB20S 2022", "categoria": "sedan", "preco": 76000.0},
],
"selected_vehicle": {"id": 9, "modelo": "Hyundai HB20S 2022", "categoria": "sedan", "preco": 76000.0},
"pending_single_vehicle_confirmation": {"id": 9, "modelo": "Hyundai HB20S 2022", "categoria": "sedan", "preco": 76000.0},
}
},
)
registry = FakeRegistry()
registry.raise_http_exception = HTTPException(
status_code=400,
detail={
"code": "credit_not_approved",
"message": "Cliente nao aprovado para este valor. Limite disponivel: R$ 70878.00.",
"retryable": False,
"field": "cpf",
},
)
flow = OrderFlowHarness(state=state, registry=registry)
async def fake_hydrate_mock_customer_from_cpf(cpf: str, user_id: int | None = None):
return {"cpf": cpf, "user_id": user_id}
with patch(
"app.services.flows.order_flow.hydrate_mock_customer_from_cpf",
new=fake_hydrate_mock_customer_from_cpf,
):
response = await flow._try_collect_and_create_order(
message="12345678909",
user_id=10,
extracted_fields={},
intents={},
)
self.assertIn("nao aprovado", response)
self.assertIsNone(state.get_entry("pending_order_drafts", 10))
self.assertEqual(state.get_user_context(10)["selected_vehicle"], None)
self.assertEqual(state.get_user_context(10)["last_stock_results"], [])
self.assertEqual(state.get_user_context(10).get("pending_single_vehicle_confirmation"), None)
async def test_order_flow_refreshes_stale_stock_results_when_budget_changes(self):
state = FakeState(
contexts={
10: {
"generic_memory": {"cpf": "12345678909", "orcamento_max": 45000},
"last_stock_results": [
{"id": 3, "modelo": "Chevrolet Onix 2022", "categoria": "suv", "preco": 51809.0},
{"id": 7, "modelo": "Fiat Argo 2020", "categoria": "suv", "preco": 61857.0},
],
"selected_vehicle": {"id": 3, "modelo": "Chevrolet Onix 2022", "categoria": "suv", "preco": 51809.0},
}
}
)
registry = FakeRegistry()
flow = OrderFlowHarness(state=state, registry=registry)
async def fake_hydrate_mock_customer_from_cpf(cpf: str, user_id: int | None = None):
return {"cpf": cpf, "user_id": user_id}
with patch(
"app.services.flows.order_flow.hydrate_mock_customer_from_cpf",
new=fake_hydrate_mock_customer_from_cpf,
):
response = await flow._try_collect_and_create_order(
message="Quero comprar um carro de 45 mil, meu CPF e 12345678909",
user_id=10,
extracted_fields={"cpf": "12345678909"},
intents={},
turn_decision={"intent": "order_create", "domain": "sales", "action": "collect_order_create"},
)
self.assertEqual(registry.calls[0][0], "consultar_estoque")
self.assertNotIn("Chevrolet Onix 2022", response)
self.assertEqual(state.get_user_context(10)["selected_vehicle"], None)
self.assertEqual(len(state.get_user_context(10)["last_stock_results"]), 2)
async def test_order_flow_refreshes_stale_stock_results_when_profile_changes(self):
state = FakeState(
contexts={
10: {
"generic_memory": {"cpf": "12345678909", "orcamento_max": 50000, "perfil_veiculo": ["hatch"]},
"last_stock_results": [
{"id": 3, "modelo": "Chevrolet Onix 2022", "categoria": "suv", "preco": 48000.0},
],
"selected_vehicle": {"id": 3, "modelo": "Chevrolet Onix 2022", "categoria": "suv", "preco": 48000.0},
}
}
)
registry = FakeRegistry()
flow = OrderFlowHarness(state=state, registry=registry)
async def fake_hydrate_mock_customer_from_cpf(cpf: str, user_id: int | None = None):
return {"cpf": cpf, "user_id": user_id}
with patch(
"app.services.flows.order_flow.hydrate_mock_customer_from_cpf",
new=fake_hydrate_mock_customer_from_cpf,
):
response = await flow._try_collect_and_create_order(
message="Quero comprar um hatch de 50 mil, meu CPF e 12345678909",
user_id=10,
extracted_fields={"cpf": "12345678909"},
intents={},
turn_decision={"intent": "order_create", "domain": "sales", "action": "collect_order_create"},
)
self.assertEqual(registry.calls[0][0], "consultar_estoque")
self.assertEqual(state.get_user_context(10)["selected_vehicle"], None)
self.assertTrue(
all(item.get("categoria") != "suv" for item in state.get_user_context(10)["last_stock_results"])
)
class RentalFlowDraftTests(unittest.IsolatedAsyncioTestCase):
def _base_context(self):
return {
"active_domain": "general",
"active_task": None,
"generic_memory": {},
"shared_memory": {},
"order_queue": [],
"pending_order_selection": None,
"pending_switch": None,
"last_stock_results": [],
"selected_vehicle": None,
"last_rental_results": [],
"selected_rental_vehicle": None,
}
async def test_rental_flow_lists_fleet_and_stores_pending_selection(self):
state = FakeState(contexts={21: self._base_context()})
registry = FakeRegistry()
flow = RentalFlowHarness(state=state, registry=registry)
response = await flow._try_collect_and_open_rental(
message="quais carros estao disponiveis para aluguel",
user_id=21,
extracted_fields={},
intents={},
turn_decision={"intent": "rental_list", "domain": "rental", "action": "answer_user"},
)
self.assertEqual(registry.calls[0][0], "consultar_frota_aluguel")
self.assertIn("veiculo(s) para locacao", response)
self.assertIsNotNone(state.get_entry("pending_rental_selections", 21))
self.assertEqual(state.get_user_context(21)["active_domain"], "rental")
async def test_rental_flow_accepts_vehicle_selection_from_list_index(self):
state = FakeState(
entries={
"pending_rental_selections": {
21: {
"payload": [
{"id": 1, "placa": "RAA1A01", "modelo": "Chevrolet Tracker", "categoria": "suv", "ano": 2024, "valor_diaria": 219.9, "status": "disponivel"},
{"id": 2, "placa": "RAA1A02", "modelo": "Fiat Pulse", "categoria": "suv", "ano": 2024, "valor_diaria": 189.9, "status": "disponivel"},
],
"expires_at": utc_now() + timedelta(minutes=15),
}
}
},
contexts={21: self._base_context() | {"active_domain": "rental"}},
)
registry = FakeRegistry()
flow = RentalFlowHarness(state=state, registry=registry)
response = await flow._try_collect_and_open_rental(
message="1",
user_id=21,
extracted_fields={},
intents={},
turn_decision={"intent": "rental_create", "domain": "rental", "action": "answer_user"},
)
draft = state.get_entry("pending_rental_drafts", 21)
self.assertIsNotNone(draft)
self.assertEqual(draft["payload"]["rental_vehicle_id"], 1)
self.assertEqual(state.get_user_context(21)["selected_rental_vehicle"]["placa"], "RAA1A01")
self.assertIn("a data e hora de inicio da locacao", response)
self.assertIn("a data e hora previstas para devolucao", response)
async def test_rental_flow_opens_contract_after_collecting_dates(self):
state = FakeState(
entries={
"pending_rental_drafts": {
21: {
"payload": {
"rental_vehicle_id": 1,
"placa": "RAA1A01",
"modelo_veiculo": "Chevrolet Tracker",
},
"expires_at": utc_now() + timedelta(minutes=15),
}
}
},
contexts={21: self._base_context() | {"active_domain": "rental", "selected_rental_vehicle": {"id": 1, "placa": "RAA1A01", "modelo": "Chevrolet Tracker", "categoria": "suv", "ano": 2024, "valor_diaria": 219.9, "status": "disponivel"}}},
)
registry = FakeRegistry()
flow = RentalFlowHarness(state=state, registry=registry)
response = await flow._try_collect_and_open_rental(
message="20/03/2026 10:00 ate 23/03/2026 10:00",
user_id=21,
extracted_fields={},
intents={},
turn_decision={"intent": "rental_create", "domain": "rental", "action": "answer_user"},
)
self.assertEqual(registry.calls[0][0], "abrir_locacao_aluguel")
self.assertEqual(registry.calls[0][1]["rental_vehicle_id"], 1)
self.assertEqual(registry.calls[0][1]["data_inicio"], "20/03/2026 10:00")
self.assertEqual(registry.calls[0][1]["data_fim_prevista"], "23/03/2026 10:00")
self.assertIsNone(state.get_entry("pending_rental_drafts", 21))
self.assertIn("LOC-TESTE-123", response)
class ReviewFlowDraftTests(unittest.IsolatedAsyncioTestCase):
async def test_review_flow_extracts_relative_datetime_from_followup_message(self):
fixed_now = lambda: datetime(2026, 3, 12, 9, 0)
state = FakeState(
entries={
"pending_review_drafts": {
21: {
"payload": {"placa": "ABC1269"},
"expires_at": utc_now() + timedelta(minutes=30),
}
}
}
)
registry = FakeRegistry()
flow = ReviewFlowHarness(state=state, registry=registry, review_now_provider=fixed_now)
response = await flow._try_collect_and_schedule_review(
message="Eu gostaria de marcar amanha as 16 horas",
user_id=21,
extracted_fields={},
intents={},
turn_decision={"intent": "review_schedule", "domain": "review", "action": "answer_user"},
)
draft = state.get_entry("pending_review_drafts", 21)
self.assertIsNotNone(draft)
self.assertIn("data_hora", draft["payload"])
self.assertEqual(draft["payload"]["data_hora"][-5:], "16:00")
self.assertIn("o modelo do veiculo", response)
self.assertTrue(any(payload.get("review_flow_source") == "draft" for _, payload in flow.logged_events))
async def test_review_flow_date_only_with_other_missing_fields_mentions_captured_date_and_requested_time(self):
fixed_now = lambda: datetime(2026, 3, 12, 9, 0)
state = FakeState(
entries={
"pending_review_drafts": {
21: {
"payload": {"placa": "ABC1269"},
"expires_at": utc_now() + timedelta(minutes=30),
}
}
}
)
registry = FakeRegistry()
flow = ReviewFlowHarness(state=state, registry=registry, review_now_provider=fixed_now)
response = await flow._try_collect_and_schedule_review(
message="amanha",
user_id=21,
extracted_fields={},
intents={},
turn_decision={"intent": "review_schedule", "domain": "review", "action": "answer_user"},
)
draft = state.get_entry("pending_review_drafts", 21)
self.assertIsNotNone(draft)
self.assertEqual(draft["payload"].get("data_hora_base"), "13/03/2026")
self.assertIn("Perfeito. Tenho a data 13/03/2026.", response)
self.assertIn("- o horario desejado para a revisao", response)
self.assertIn("- o modelo do veiculo", response)
self.assertNotIn("- a data e hora desejada para a revisao", response)
async def test_review_flow_keeps_review_draft_when_time_follow_up_is_misclassified_as_sales(self):
state = FakeState(
entries={
"pending_review_drafts": {
21: {
"payload": {
"placa": "ABC1269",
"modelo": "Onix",
"ano": 2024,
"km": 12000,
"revisao_previa_concessionaria": False,
"data_hora_base": "13/03/2026",
},
"expires_at": utc_now() + timedelta(minutes=30),
}
}
}
)
registry = FakeRegistry()
flow = ReviewFlowHarness(state=state, registry=registry)
response = await flow._try_collect_and_schedule_review(
message="16h",
user_id=21,
extracted_fields={},
intents={},
turn_decision={"intent": "order_create", "domain": "sales", "action": "answer_user"},
)
self.assertEqual(registry.calls[0][0], "agendar_revisao")
self.assertEqual(registry.calls[0][1]["data_hora"], "13/03/2026 16:00")
self.assertIsNone(state.get_entry("pending_review_drafts", 21))
self.assertIn("REV-TESTE-123", response)
async def test_review_flow_extracts_model_year_km_and_review_history_from_free_text(self):
state = FakeState(
entries={
"pending_review_drafts": {
21: {
"payload": {"placa": "ABC1269", "data_hora": "13/03/2026 16:00"},
"expires_at": utc_now() + timedelta(minutes=30),
}
}
}
)
registry = FakeRegistry()
flow = ReviewFlowHarness(state=state, registry=registry)
response = await flow._try_collect_and_schedule_review(
message="O modelo do meu carro e um Onix e ele e 2021, 30000 km, nunca fiz revisao",
user_id=21,
extracted_fields={},
intents={},
turn_decision={"intent": "review_schedule", "domain": "review", "action": "answer_user"},
)
self.assertIsNone(state.get_entry("pending_review_drafts", 21))
self.assertEqual(registry.calls[0][0], "agendar_revisao")
_, arguments, tool_user_id = registry.calls[0]
self.assertEqual(tool_user_id, 21)
self.assertEqual(arguments.get("modelo"), "Um Onix")
self.assertEqual(arguments.get("ano"), 2021)
self.assertEqual(arguments.get("km"), 30000)
self.assertFalse(arguments.get("revisao_previa_concessionaria"))
self.assertIn("REV-TESTE-123", response)
async def test_review_flow_extracts_short_vehicle_summary_from_free_text(self):
state = FakeState(
entries={
"pending_review_drafts": {
21: {
"payload": {"placa": "ABC1269", "data_hora": "13/03/2026 16:00"},
"expires_at": utc_now() + timedelta(minutes=30),
}
}
}
)
registry = FakeRegistry()
flow = ReviewFlowHarness(state=state, registry=registry)
response = await flow._try_collect_and_schedule_review(
message="Onix 2024, 12000 km, nao fiz revisao na concessionaria",
user_id=21,
extracted_fields={},
intents={},
turn_decision={"intent": "review_schedule", "domain": "review", "action": "answer_user"},
)
self.assertIsNone(state.get_entry("pending_review_drafts", 21))
self.assertEqual(registry.calls[0][0], "agendar_revisao")
_, arguments, tool_user_id = registry.calls[0]
self.assertEqual(tool_user_id, 21)
self.assertEqual(arguments.get("modelo"), "Onix")
self.assertEqual(arguments.get("ano"), 2024)
self.assertEqual(arguments.get("km"), 12000)
self.assertFalse(arguments.get("revisao_previa_concessionaria"))
self.assertIn("REV-TESTE-123", response)
async def test_review_flow_accepts_bare_model_when_it_is_last_missing_field(self):
state = FakeState(
entries={
"pending_review_drafts": {
21: {
"payload": {
"placa": "ABC1269",
"data_hora": "13/03/2026 16:00",
"ano": 2024,
"km": 12000,
"revisao_previa_concessionaria": False,
},
"expires_at": utc_now() + timedelta(minutes=30),
}
}
}
)
registry = FakeRegistry()
flow = ReviewFlowHarness(state=state, registry=registry)
response = await flow._try_collect_and_schedule_review(
message="Onix",
user_id=21,
extracted_fields={},
intents={},
turn_decision={"intent": "review_schedule", "domain": "review", "action": "answer_user"},
)
self.assertIsNone(state.get_entry("pending_review_drafts", 21))
self.assertEqual(registry.calls[0][0], "agendar_revisao")
_, arguments, tool_user_id = registry.calls[0]
self.assertEqual(tool_user_id, 21)
self.assertEqual(arguments.get("modelo"), "Onix")
self.assertIn("REV-TESTE-123", response)
async def test_review_flow_keeps_plate_and_datetime_across_incremental_messages(self):
fixed_now = lambda: datetime(2026, 3, 12, 9, 0)
state = FakeState()
registry = FakeRegistry()
flow = ReviewFlowHarness(state=state, registry=registry, review_now_provider=fixed_now)
await flow._try_collect_and_schedule_review(
message="gostaria de marcar uma nova revisao agora",
user_id=21,
extracted_fields={},
intents={},
turn_decision={"intent": "review_schedule", "domain": "review", "action": "ask_missing_fields"},
)
await flow._try_collect_and_schedule_review(
message="placa ABC1269",
user_id=21,
extracted_fields={},
intents={},
turn_decision={"intent": "review_schedule", "domain": "review", "action": "answer_user"},
)
await flow._try_collect_and_schedule_review(
message="Eu gostaria de marcar amanha as 16 horas",
user_id=21,
extracted_fields={},
intents={},
turn_decision={"intent": "review_schedule", "domain": "review", "action": "answer_user"},
)
await flow._try_collect_and_schedule_review(
message="O modelo do meu carro e um Onix e ele e 2021",
user_id=21,
extracted_fields={},
intents={},
turn_decision={"intent": "review_schedule", "domain": "review", "action": "answer_user"},
)
response = await flow._try_collect_and_schedule_review(
message="30000 km, nunca fiz revisao",
user_id=21,
extracted_fields={},
intents={},
turn_decision={"intent": "review_schedule", "domain": "review", "action": "answer_user"},
)
self.assertIsNone(state.get_entry("pending_review_drafts", 21))
self.assertEqual(registry.calls[0][0], "agendar_revisao")
_, arguments, tool_user_id = registry.calls[0]
self.assertEqual(tool_user_id, 21)
self.assertEqual(arguments.get("placa"), "ABC1269")
self.assertEqual(arguments.get("data_hora"), "13/03/2026 16:00")
self.assertEqual(arguments.get("modelo"), "Um Onix")
self.assertEqual(arguments.get("ano"), 2021)
self.assertEqual(arguments.get("km"), 30000)
self.assertFalse(arguments.get("revisao_previa_concessionaria"))
self.assertIn("REV-TESTE-123", response)
async def test_review_flow_bootstraps_from_active_review_context_when_draft_is_missing(self):
state = FakeState(
contexts={
21: {
"active_domain": "review",
"generic_memory": {},
"shared_memory": {},
"order_queue": [],
"pending_order_selection": None,
"pending_switch": None,
"last_stock_results": [],
"selected_vehicle": None,
}
}
)
registry = FakeRegistry()
flow = ReviewFlowHarness(state=state, registry=registry)
response = await flow._try_collect_and_schedule_review(
message="placa ABC1269",
user_id=21,
extracted_fields={},
intents={},
turn_decision={"intent": "general", "domain": "general", "action": "answer_user"},
)
draft = state.get_entry("pending_review_drafts", 21)
self.assertIsNotNone(draft)
self.assertEqual(draft["payload"]["placa"], "ABC1269")
self.assertIn("a data e hora desejada para a revisao", response)
self.assertTrue(
any(payload.get("review_flow_source") == "active_domain_fallback" for _, payload in flow.logged_events)
)
async def test_review_flow_restores_draft_from_context_snapshot_when_bucket_is_missing(self):
state = FakeState(
contexts={
21: {
"active_domain": "review",
"active_task": "review_schedule",
"generic_memory": {},
"shared_memory": {},
"collected_slots": {
"review_schedule": {"placa": "A0T1C23", "data_hora": "14/03/2026 18:00"},
},
"flow_snapshots": {
"review_schedule": {
"payload": {"placa": "A0T1C23", "data_hora": "14/03/2026 18:00"},
"expires_at": utc_now() + timedelta(minutes=30),
}
},
"order_queue": [],
"pending_order_selection": None,
"pending_switch": None,
"last_stock_results": [],
"selected_vehicle": None,
}
}
)
registry = FakeRegistry()
flow = ReviewFlowHarness(state=state, registry=registry)
await flow._try_collect_and_schedule_review(
message="o modelo e Onix e e 2024",
user_id=21,
extracted_fields={"modelo": "Onix", "ano": 2024},
intents={},
turn_decision={"intent": "review_schedule", "domain": "review", "action": "answer_user"},
)
response = await flow._try_collect_and_schedule_review(
message="20000 km, nunca fiz revisao na concessionaria",
user_id=21,
extracted_fields={},
intents={},
turn_decision={"intent": "review_schedule", "domain": "review", "action": "answer_user"},
)
self.assertEqual(registry.calls[0][0], "agendar_revisao")
_, arguments, tool_user_id = registry.calls[0]
self.assertEqual(tool_user_id, 21)
self.assertEqual(arguments.get("placa"), "A0T1C23")
self.assertEqual(arguments.get("data_hora"), "14/03/2026 18:00")
self.assertEqual(arguments.get("modelo"), "Onix")
self.assertEqual(arguments.get("ano"), 2024)
self.assertEqual(arguments.get("km"), 20000)
self.assertFalse(arguments.get("revisao_previa_concessionaria"))
self.assertIn("REV-TESTE-123", response)
async def test_review_flow_offers_reuse_of_last_vehicle_package(self):
state = FakeState(
entries={
"last_review_packages": {
21: {
"payload": {
"placa": "ABC1234",
"modelo": "Corolla",
"ano": 2020,
"km": 30000,
"revisao_previa_concessionaria": True,
},
"expires_at": utc_now() + timedelta(minutes=30),
}
}
}
)
registry = FakeRegistry()
flow = ReviewFlowHarness(state=state, registry=registry)
response = await flow._try_collect_and_schedule_review(
message="gostaria de agendar uma nova revisao agora",
user_id=21,
extracted_fields={},
intents={},
turn_decision={"intent": "review_schedule", "domain": "review", "action": "collect_review_schedule"},
)
self.assertIn("Posso reutilizar os dados do ultimo veiculo", response)
self.assertIn("Corolla", response)
self.assertIn("ABC1234", response)
self.assertIsNotNone(state.get_entry("pending_review_reuse_confirmations", 21))
self.assertTrue(
any(payload.get("review_flow_source") == "last_review_package" for _, payload in flow.logged_events)
)
async def test_review_flow_rejects_reuse_and_accepts_new_vehicle_in_same_message(self):
state = FakeState(
entries={
"pending_review_reuse_confirmations": {
21: {
"payload": {
"placa": "ABC1234",
"modelo": "Corolla",
"ano": 2020,
"km": 30000,
"revisao_previa_concessionaria": True,
},
"expires_at": utc_now() + timedelta(minutes=30),
}
}
}
)
registry = FakeRegistry()
flow = ReviewFlowHarness(state=state, registry=registry)
response = await flow._try_collect_and_schedule_review(
message="nao, agora e outro veiculo, placa ABC1269",
user_id=21,
extracted_fields={"placa": "ABC1269"},
intents={},
turn_decision={"intent": "review_schedule", "domain": "review", "action": "collect_review_schedule"},
)
draft = state.get_entry("pending_review_drafts", 21)
self.assertIsNone(state.get_entry("pending_review_reuse_confirmations", 21))
self.assertIsNotNone(draft)
self.assertEqual(draft["payload"].get("placa"), "ABC1269")
self.assertIn("a data e hora desejada para a revisao", response)
async def test_review_flow_rejects_reuse_without_new_vehicle_and_opens_fresh_draft(self):
state = FakeState(
entries={
"pending_review_reuse_confirmations": {
21: {
"payload": {
"placa": "ABC1234",
"modelo": "Corolla",
"ano": 2020,
"km": 30000,
"revisao_previa_concessionaria": True,
},
"expires_at": utc_now() + timedelta(minutes=30),
}
}
}
)
registry = FakeRegistry()
flow = ReviewFlowHarness(state=state, registry=registry)
response = await flow._try_collect_and_schedule_review(
message="nao",
user_id=21,
extracted_fields={},
intents={},
turn_decision={"intent": "review_schedule", "domain": "review", "action": "collect_review_schedule"},
)
draft = state.get_entry("pending_review_drafts", 21)
self.assertIsNone(state.get_entry("pending_review_reuse_confirmations", 21))
self.assertIsNotNone(draft)
self.assertEqual(draft["payload"], {})
self.assertIn("a placa do veiculo", response)
async def test_review_flow_offers_reuse_again_on_next_new_schedule_after_previous_rejection(self):
state = FakeState(
entries={
"last_review_packages": {
21: {
"payload": {
"placa": "A0T1C23",
"modelo": "Onix",
"ano": 2024,
"km": 20000,
"revisao_previa_concessionaria": False,
},
"expires_at": utc_now() + timedelta(minutes=30),
}
}
}
)
registry = FakeRegistry()
flow = ReviewFlowHarness(state=state, registry=registry)
first_response = await flow._try_collect_and_schedule_review(
message="quero agendar uma revisao",
user_id=21,
extracted_fields={},
intents={},
turn_decision={"intent": "review_schedule", "domain": "review", "action": "collect_review_schedule"},
)
self.assertIn("Posso reutilizar os dados do ultimo veiculo", first_response)
second_response = await flow._try_collect_and_schedule_review(
message="nao",
user_id=21,
extracted_fields={},
intents={},
turn_decision={"intent": "review_schedule", "domain": "review", "action": "collect_review_schedule"},
)
self.assertIn("a placa do veiculo", second_response)
self.assertIsNotNone(state.get_entry("pending_review_drafts", 21))
state.pop_entry("pending_review_drafts", 21)
third_response = await flow._try_collect_and_schedule_review(
message="quero agendar uma revisao",
user_id=21,
extracted_fields={},
intents={},
turn_decision={"intent": "review_schedule", "domain": "review", "action": "collect_review_schedule"},
)
self.assertIn("Posso reutilizar os dados do ultimo veiculo", third_response)
self.assertIsNotNone(state.get_entry("pending_review_reuse_confirmations", 21))
async def test_review_flow_offers_reuse_on_active_domain_fallback_without_explicit_llm_intent(self):
state = FakeState(
entries={
"last_review_packages": {
21: {
"payload": {
"placa": "ABC1C23",
"modelo": "Onix",
"ano": 2024,
"km": 20000,
"revisao_previa_concessionaria": False,
},
"expires_at": utc_now() + timedelta(minutes=30),
}
}
},
contexts={
21: {
"active_domain": "review",
"generic_memory": {"placa": "ABC1C23"},
"shared_memory": {"placa": "ABC1C23"},
"last_stock_results": [],
"selected_vehicle": None,
}
},
)
registry = FakeRegistry()
flow = ReviewFlowHarness(state=state, registry=registry)
response = await flow._try_collect_and_schedule_review(
message="quero agendar uma revisao",
user_id=21,
extracted_fields={},
intents={},
turn_decision={"intent": "general", "domain": "general", "action": "answer_user"},
)
self.assertIn("Posso reutilizar os dados do ultimo veiculo", response)
self.assertIsNotNone(state.get_entry("pending_review_reuse_confirmations", 21))
async def test_review_flow_explicit_reuse_request_opens_reuse_confirmation_even_without_pending_prompt(self):
state = FakeState(
entries={
"last_review_packages": {
21: {
"payload": {
"placa": "A0T1C23",
"modelo": "Onix",
"ano": 2024,
"km": 20000,
"revisao_previa_concessionaria": False,
},
"expires_at": utc_now() + timedelta(minutes=30),
}
}
}
)
registry = FakeRegistry()
flow = ReviewFlowHarness(state=state, registry=registry)
response = await flow._try_collect_and_schedule_review(
message="eu gostaria de reaproveitar as informacoes do ultimo carro",
user_id=21,
extracted_fields={},
intents={},
turn_decision={"intent": "general", "domain": "general", "action": "answer_user"},
)
self.assertIn("Posso reutilizar os dados do ultimo veiculo", response)
self.assertIsNotNone(state.get_entry("pending_review_reuse_confirmations", 21))
async def test_review_flow_reuses_vehicle_with_date_only_and_requests_missing_time(self):
state = FakeState(
entries={
"pending_review_reuse_confirmations": {
21: {
"payload": {
"placa": "ABC1234",
"modelo": "Onix",
"ano": 2024,
"km": 50000,
"revisao_previa_concessionaria": False,
},
"expires_at": utc_now() + timedelta(minutes=30),
}
}
}
)
registry = FakeRegistry()
flow = ReviewFlowHarness(state=state, registry=registry)
response = await flow._try_collect_and_schedule_review(
message="Sim, quero marcar uma para o dia 18/08/2026",
user_id=21,
extracted_fields={"data_hora": "18/08/2026 00:00"},
intents={},
turn_decision={"intent": "review_schedule", "domain": "review", "action": "collect_review_schedule"},
)
draft = state.get_entry("pending_review_drafts", 21)
self.assertIsNotNone(draft)
self.assertNotIn("data_hora", draft["payload"])
self.assertEqual(draft["payload"].get("data_hora_base"), "18/08/2026")
self.assertIn("Agora me informe o horario desejado", response)
final_response = await flow._try_collect_and_schedule_review(
message="as 10 horas",
user_id=21,
extracted_fields={},
intents={},
turn_decision={"intent": "review_schedule", "domain": "review", "action": "collect_review_schedule"},
)
self.assertEqual(registry.calls[0][0], "agendar_revisao")
self.assertEqual(registry.calls[0][1]["data_hora"], "18/08/2026 10:00")
self.assertIn("REV-TESTE-123", final_response)
async def test_review_flow_reuses_vehicle_with_relative_date_only_and_requests_missing_time(self):
state = FakeState(
entries={
"pending_review_reuse_confirmations": {
21: {
"payload": {
"placa": "ABC1263",
"modelo": "Onix",
"ano": 2024,
"km": 50000,
"revisao_previa_concessionaria": False,
},
"expires_at": utc_now() + timedelta(minutes=30),
}
}
}
)
registry = FakeRegistry()
flow = ReviewFlowHarness(state=state, registry=registry)
today_text = datetime.now().strftime("%d/%m/%Y")
response = await flow._try_collect_and_schedule_review(
message="eu gostaria de marcar para hoje",
user_id=21,
extracted_fields={},
intents={},
turn_decision={"intent": "review_schedule", "domain": "review", "action": "collect_review_schedule"},
)
draft = state.get_entry("pending_review_drafts", 21)
self.assertIsNotNone(draft)
self.assertEqual(draft["payload"].get("data_hora_base"), today_text)
self.assertIn("Agora me informe o horario desejado", response)
final_response = await flow._try_collect_and_schedule_review(
message="as 14 horas",
user_id=21,
extracted_fields={},
intents={},
turn_decision={"intent": "review_schedule", "domain": "review", "action": "collect_review_schedule"},
)
self.assertEqual(registry.calls[0][0], "agendar_revisao")
self.assertEqual(registry.calls[0][1]["data_hora"], f"{today_text} 14:00")
self.assertIn("REV-TESTE-123", final_response)
async def test_review_flow_reuse_confirmation_accepts_relative_date_then_time_in_follow_up(self):
state = FakeState(
entries={
"pending_review_reuse_confirmations": {
21: {
"payload": {
"placa": "ABC1263",
"modelo": "Onix",
"ano": 2024,
"km": 50000,
"revisao_previa_concessionaria": False,
},
"expires_at": utc_now() + timedelta(minutes=30),
}
}
}
)
registry = FakeRegistry()
flow = ReviewFlowHarness(state=state, registry=registry)
today_text = datetime.now().strftime("%d/%m/%Y")
first_response = await flow._try_collect_and_schedule_review(
message="sim",
user_id=21,
extracted_fields={},
intents={},
turn_decision={"intent": "review_schedule", "domain": "review", "action": "collect_review_schedule"},
)
self.assertIn("Me informe apenas a data e hora desejada", first_response)
second_response = await flow._try_collect_and_schedule_review(
message="quero marcar para hoje",
user_id=21,
extracted_fields={},
intents={},
turn_decision={"intent": "review_schedule", "domain": "review", "action": "collect_review_schedule"},
)
draft = state.get_entry("pending_review_drafts", 21)
self.assertIsNotNone(draft)
self.assertEqual(draft["payload"].get("data_hora_base"), today_text)
self.assertIn("Agora me informe o horario desejado", second_response)
final_response = await flow._try_collect_and_schedule_review(
message="as 14 horas",
user_id=21,
extracted_fields={},
intents={},
turn_decision={"intent": "review_schedule", "domain": "review", "action": "collect_review_schedule"},
)
self.assertEqual(registry.calls[0][0], "agendar_revisao")
self.assertEqual(registry.calls[0][1]["data_hora"], f"{today_text} 14:00")
self.assertIn("REV-TESTE-123", final_response)
async def test_review_flow_clears_stale_pending_confirmation_when_user_starts_new_schedule(self):
state = FakeState(
entries={
"pending_review_confirmations": {
21: {
"payload": {
"placa": "ABC9999",
"data_hora": "14/03/2026 10:00",
"modelo": "Corolla",
"ano": 2020,
"km": 30000,
"revisao_previa_concessionaria": True,
},
"expires_at": utc_now() + timedelta(minutes=30),
}
}
}
)
registry = FakeRegistry()
flow = ReviewFlowHarness(state=state, registry=registry)
response = await flow._try_collect_and_schedule_review(
message="quero agendar uma revisao",
user_id=21,
extracted_fields={},
intents={},
turn_decision={"intent": "review_schedule", "domain": "review", "action": "collect_review_schedule"},
)
self.assertIsNone(state.get_entry("pending_review_confirmations", 21))
self.assertIsNotNone(state.get_entry("pending_review_drafts", 21))
self.assertIn("a placa do veiculo", response)
async def test_review_flow_keeps_draft_and_clears_data_hora_on_retryable_error(self):
state = FakeState(
entries={
"pending_review_drafts": {
21: {
"payload": {
"placa": "ABC1234",
"data_hora": "2026-03-10T09:00:00-03:00",
"modelo": "HB20",
"ano": 2022,
"km": 15000,
"revisao_previa_concessionaria": True,
},
"expires_at": utc_now() + timedelta(minutes=30),
}
}
}
)
registry = FakeRegistry()
registry.raise_http_exception = HTTPException(
status_code=409,
detail={
"code": "review_schedule_conflict",
"message": "O horario solicitado esta ocupado.",
"retryable": True,
"field": "data_hora",
"suggested_iso": "2026-03-10T09:30:00-03:00",
},
)
flow = ReviewFlowHarness(state=state, registry=registry)
response = await flow._try_collect_and_schedule_review(
message="agendar revisao",
user_id=21,
extracted_fields={},
intents={},
turn_decision={"intent": "review_schedule", "domain": "review", "action": "call_tool"},
)
draft = state.get_entry("pending_review_drafts", 21)
self.assertIn("ocupado", response)
self.assertIsNotNone(draft)
self.assertEqual(draft["payload"].get("placa"), "ABC1234")
self.assertNotIn("data_hora", draft["payload"])
async def test_review_management_preserves_reason_from_single_cancel_message(self):
state = FakeState()
registry = FakeRegistry()
flow = ReviewFlowHarness(state=state, registry=registry)
response = await flow._try_handle_review_management(
message="Quero cancelar a revisao do protocolo REV-20260313-F754AF27 porque nao vou poder ir",
user_id=21,
extracted_fields={},
intents={},
turn_decision={"intent": "review_cancel", "domain": "review", "action": "answer_user"},
)
self.assertEqual(registry.calls[0][0], "cancelar_agendamento_revisao")
self.assertEqual(registry.calls[0][1]["protocolo"], "REV-20260313-F754AF27")
self.assertEqual(registry.calls[0][1]["motivo"], "nao vou poder ir")
self.assertIn("cancelar_agendamento_revisao", response)
async def test_review_management_infers_cancel_intent_from_protocol_message(self):
state = FakeState()
registry = FakeRegistry()
flow = ReviewFlowHarness(state=state, registry=registry)
response = await flow._try_handle_review_management(
message="eu gostaria de cancelar o meu agendamento REV-20260313-F754AF27",
user_id=21,
extracted_fields={},
intents={},
turn_decision={"intent": "review_schedule", "domain": "review", "action": "answer_user"},
)
self.assertEqual(registry.calls[0][0], "cancelar_agendamento_revisao")
self.assertEqual(registry.calls[0][1]["protocolo"], "REV-20260313-F754AF27")
self.assertIn("cancelar_agendamento_revisao", response)
self.assertIn("REV-20260313-F754AF27", response)
async def test_review_management_reschedule_consumes_relative_datetime_follow_up(self):
fixed_now = lambda: datetime(2026, 3, 12, 9, 0)
state = FakeState(
entries={
"pending_review_management_drafts": {
21: {
"action": "reschedule",
"payload": {"protocolo": "REV-20260313-F754AF27"},
"expires_at": utc_now() + timedelta(minutes=30),
}
}
}
)
registry = FakeRegistry()
flow = ReviewFlowHarness(state=state, registry=registry, review_now_provider=fixed_now)
response = await flow._try_handle_review_management(
message="amanha 11h",
user_id=21,
extracted_fields={},
intents={},
turn_decision={"intent": "review_reschedule", "domain": "review", "action": "answer_user"},
)
self.assertIsNone(state.get_entry("pending_review_management_drafts", 21))
self.assertEqual(registry.calls[0][0], "editar_data_revisao")
self.assertEqual(registry.calls[0][1]["protocolo"], "REV-20260313-F754AF27")
self.assertEqual(registry.calls[0][1]["nova_data_hora"], "13/03/2026 11:00")
self.assertIn("13/03/2026 11:00", response)
async def test_review_management_reschedule_date_only_then_time_follow_up(self):
fixed_now = lambda: datetime(2026, 3, 12, 9, 0)
state = FakeState(
entries={
"pending_review_management_drafts": {
21: {
"action": "reschedule",
"payload": {"protocolo": "REV-20260313-F754AF27"},
"expires_at": utc_now() + timedelta(minutes=30),
}
}
}
)
registry = FakeRegistry()
flow = ReviewFlowHarness(state=state, registry=registry, review_now_provider=fixed_now)
first_response = await flow._try_handle_review_management(
message="amanha",
user_id=21,
extracted_fields={},
intents={},
turn_decision={"intent": "review_reschedule", "domain": "review", "action": "answer_user"},
)
draft = state.get_entry("pending_review_management_drafts", 21)
self.assertIsNotNone(draft)
self.assertEqual(draft["payload"].get("nova_data_hora_base"), "13/03/2026")
self.assertIn("Perfeito. Tenho a data 13/03/2026.", first_response)
second_response = await flow._try_handle_review_management(
message="11h",
user_id=21,
extracted_fields={},
intents={},
turn_decision={"intent": "review_reschedule", "domain": "review", "action": "answer_user"},
)
self.assertIsNone(state.get_entry("pending_review_management_drafts", 21))
self.assertEqual(registry.calls[0][0], "editar_data_revisao")
self.assertEqual(registry.calls[0][1]["nova_data_hora"], "13/03/2026 11:00")
self.assertIn("13/03/2026 11:00", second_response)
async def test_review_management_infers_listing_intent_from_agendamentos_message(self):
state = FakeState()
registry = FakeRegistry()
flow = ReviewFlowHarness(state=state, registry=registry)
response = await flow._try_handle_review_management(
message="liste para mim os meus agendamentos de revisao",
user_id=21,
extracted_fields={},
intents={},
turn_decision={"intent": "general", "domain": "general", "action": "answer_user"},
)
self.assertEqual(registry.calls[0][0], "listar_agendamentos_revisao")
self.assertEqual(registry.calls[0][1]["limite"], 100)
self.assertIn("listar_agendamentos_revisao", response)
async def test_review_schedule_clears_open_management_draft(self):
state = FakeState(
entries={
"pending_review_management_drafts": {
21: {
"action": "reschedule",
"payload": {"protocolo": "REV-20260313-F754AF27"},
"expires_at": utc_now() + timedelta(minutes=30),
}
}
}
)
registry = FakeRegistry()
flow = ReviewFlowHarness(state=state, registry=registry)
response = await flow._try_handle_review_management(
message="quero agendar uma revisao",
user_id=21,
extracted_fields={},
intents={},
turn_decision={"intent": "review_schedule", "domain": "review", "action": "answer_user"},
)
self.assertIsNone(response)
self.assertIsNone(state.get_entry("pending_review_management_drafts", 21))
async def test_review_management_does_not_override_open_schedule_draft_without_protocol(self):
state = FakeState(
entries={
"pending_review_drafts": {
21: {
"payload": {
"placa": "ABC1234",
"modelo": "Corolla",
"ano": 2020,
"km": 30000,
"revisao_previa_concessionaria": True,
},
"expires_at": utc_now() + timedelta(minutes=30),
}
}
}
)
registry = FakeRegistry()
flow = ReviewFlowHarness(state=state, registry=registry)
response = await flow._try_handle_review_management(
message="pode ser hoje as 17:30",
user_id=21,
extracted_fields={},
intents={},
turn_decision={"intent": "review_reschedule", "domain": "review", "action": "answer_user"},
)
self.assertIsNone(response)
self.assertIsNone(state.get_entry("pending_review_management_drafts", 21))
async def test_review_schedule_flow_ignores_management_message_with_protocol(self):
state = FakeState(
contexts={
21: {
"active_domain": "review",
"generic_memory": {},
"shared_memory": {},
"order_queue": [],
"pending_order_selection": None,
"pending_switch": None,
"last_stock_results": [],
"selected_vehicle": None,
}
}
)
registry = FakeRegistry()
flow = ReviewFlowHarness(state=state, registry=registry)
response = await flow._try_collect_and_schedule_review(
message="eu gostaria de cancelar o meu agendamento REV-20260313-F754AF27",
user_id=21,
extracted_fields={},
intents={},
turn_decision={"intent": "review_schedule", "domain": "review", "action": "answer_user"},
)
self.assertIsNone(response)
self.assertEqual(registry.calls, [])
async def test_review_flow_does_not_bootstrap_sales_message_from_active_review_context(self):
state = FakeState(
contexts={
21: {
"active_domain": "review",
"generic_memory": {},
"shared_memory": {},
"order_queue": [],
"pending_order_selection": None,
"pending_switch": None,
"last_stock_results": [],
"selected_vehicle": None,
}
}
)
registry = FakeRegistry()
flow = ReviewFlowHarness(state=state, registry=registry)
response = await flow._try_collect_and_schedule_review(
message="quero comprar um carro de ate 70 mil",
user_id=21,
extracted_fields={},
intents={},
turn_decision={"intent": "general", "domain": "general", "action": "answer_user"},
)
self.assertIsNone(response)
self.assertIsNone(state.get_entry("pending_review_drafts", 21))
class ContextSwitchPolicyTests(unittest.TestCase):
def test_handle_context_switch_ignores_ambiguous_confirmation_while_review_reuse_is_pending(self):
state = FakeState(
entries={
"pending_review_reuse_confirmations": {
9: {
"payload": {
"placa": "ABC1463",
"modelo": "Civic",
"ano": 2024,
"km": 30000,
"revisao_previa_concessionaria": False,
},
"expires_at": utc_now() + timedelta(minutes=15),
}
}
},
contexts={
9: {
"pending_switch": None,
"active_domain": "review",
"generic_memory": {},
"pending_order_selection": None,
}
},
)
service = FakeService(state)
policy = ConversationPolicy(service=service)
response = policy.handle_context_switch(
message="sim",
user_id=9,
target_domain_hint="sales",
turn_decision={"domain": "sales", "intent": "order_create", "action": "answer_user"},
)
self.assertIsNone(response)
self.assertIsNone(service._get_user_context(9).get("pending_switch"))
def test_handle_context_switch_still_confirms_explicit_domain_change_with_open_review_flow(self):
state = FakeState(
entries={
"pending_review_reuse_confirmations": {
9: {
"payload": {
"placa": "ABC1463",
"modelo": "Civic",
"ano": 2024,
"km": 30000,
"revisao_previa_concessionaria": False,
},
"expires_at": utc_now() + timedelta(minutes=15),
}
}
},
contexts={
9: {
"pending_switch": None,
"active_domain": "review",
"generic_memory": {},
"pending_order_selection": None,
}
},
)
service = FakeService(state)
policy = ConversationPolicy(service=service)
response = policy.handle_context_switch(
message="agora quero comprar um carro",
user_id=9,
target_domain_hint="sales",
turn_decision={"domain": "sales", "intent": "order_create", "action": "answer_user"},
)
self.assertEqual(
response,
"Entendi que voce quer sair de agendamento de revisao e ir para compra de veiculo. Tem certeza?",
)
self.assertIsNotNone(service._get_user_context(9).get("pending_switch"))
def test_render_context_switched_message_guides_next_step_for_sales(self):
state = FakeState()
service = FakeService(state)
policy = ConversationPolicy(service=service)
response = policy.render_context_switched_message("sales")
self.assertEqual(
response,
"Certo, contexto anterior encerrado. Vamos seguir com compra de veiculo.\n"
"Pode me dizer a faixa de preco, o modelo ou o tipo de carro que voce procura.",
)
def test_handle_context_switch_drops_stale_pending_switch_when_user_starts_other_domain(self):
state = FakeState(
contexts={
9: {
"pending_switch": {
"target_domain": "sales",
"expires_at": utc_now() + timedelta(minutes=15),
},
"active_domain": "general",
"generic_memory": {},
"pending_order_selection": None,
}
}
)
service = FakeService(state)
policy = ConversationPolicy(service=service)
response = policy.handle_context_switch(
message="quero agendar revisao",
user_id=9,
target_domain_hint="review",
turn_decision={"domain": "review", "intent": "review_schedule", "action": "collect_review_schedule"},
)
self.assertIsNone(response)
self.assertIsNone(service._get_user_context(9).get("pending_switch"))
class ToolRegistryExecutionTests(unittest.IsolatedAsyncioTestCase):
async def test_execute_ignores_extra_arguments_for_review_listing_tool(self):
async def fake_listar_agendamentos_revisao(
user_id: int | None = None,
placa: str | None = None,
status: str | None = None,
limite: int | None = 20,
):
return [{"user_id": user_id, "placa": placa, "status": status, "limite": limite}]
registry = ToolRegistry.__new__(ToolRegistry)
registry._tools = [
ToolDefinition(
name="listar_agendamentos_revisao",
description="",
parameters={},
handler=fake_listar_agendamentos_revisao,
)
]
result = await registry.execute(
"listar_agendamentos_revisao",
{"placa": "ABC1234", "status": "agendado", "limite": 10, "tipo": "revisao"},
user_id=21,
)
self.assertEqual(
result,
[{"user_id": 21, "placa": "ABC1234", "status": "agendado", "limite": 10}],
)
if __name__ == "__main__":
unittest.main()