import os import unittest from datetime import datetime, timedelta from app.core.time_utils import utc_now from unittest.mock import patch os.environ.setdefault("DEBUG", "false") from fastapi import HTTPException from app.services.flows.order_flow import OrderFlowMixin from app.services.flows.rental_flow import RentalFlowMixin from app.services.flows.review_flow import ReviewFlowMixin from app.integrations.telegram_satellite_service import _ensure_supported_runtime_configuration, _split_telegram_text from app.models.tool_model import ToolDefinition from app.services.orchestration.conversation_policy import ConversationPolicy from app.services.orchestration.entity_normalizer import EntityNormalizer from app.services.orchestration.response_formatter import fallback_format_tool_result from app.services.tools.tool_registry import ToolRegistry from app.services.tools.handlers import _parse_data_hora_revisao class FakeState: def __init__(self, entries=None, contexts=None): self.entries = entries or {} self.contexts = contexts or {} def get_entry(self, bucket: str, user_id: int | None, *, expire: bool = False): if user_id is None: return None return self.entries.get(bucket, {}).get(user_id) def set_entry(self, bucket: str, user_id: int | None, value: dict): if user_id is None: return self.entries.setdefault(bucket, {})[user_id] = value def pop_entry(self, bucket: str, user_id: int | None): if user_id is None: return None return self.entries.get(bucket, {}).pop(user_id, None) def get_user_context(self, user_id: int | None): if user_id is None: return None return self.contexts.get(user_id) def save_user_context(self, user_id: int | None, context: dict): if user_id is None: return self.contexts[user_id] = context class FakeService: def __init__(self, state): self.state = state self.normalizer = EntityNormalizer() def _is_affirmative_message(self, text: str) -> bool: normalized = self.normalizer.normalize_text(text).strip().rstrip(".!?,;:") return normalized in {"sim", "pode", "ok", "confirmo", "aceito", "fechado", "pode sim", "tenho", "tenho sim"} def _get_user_context(self, user_id: int | None): return self.state.get_user_context(user_id) def _save_user_context(self, user_id: int | None, context: dict | None) -> None: if user_id is None or not isinstance(context, dict): return self.state.save_user_context(user_id, context) class FakeRegistry: def __init__(self): self.calls = [] self.raise_http_exception = None async def execute(self, tool_name: str, arguments: dict, user_id: int | None = None): self.calls.append((tool_name, arguments, user_id)) if self.raise_http_exception is not None: raise self.raise_http_exception if tool_name == "consultar_estoque": stock_results = [ {"id": 1, "modelo": "Honda Civic 2021", "categoria": "sedan", "preco": 48500.0}, {"id": 2, "modelo": "Toyota Yaris 2020", "categoria": "hatch", "preco": 49900.0}, ] reverse = str(arguments.get("ordenar_preco") or "asc").lower() == "desc" return sorted(stock_results, key=lambda item: float(item["preco"]), reverse=reverse) if tool_name == "consultar_frota_aluguel": return [ {"id": 1, "placa": "RAA1A01", "modelo": "Chevrolet Tracker", "categoria": "suv", "ano": 2024, "valor_diaria": 219.9, "status": "disponivel"}, {"id": 2, "placa": "RAA1A02", "modelo": "Fiat Pulse", "categoria": "suv", "ano": 2024, "valor_diaria": 189.9, "status": "disponivel"}, ] if tool_name == "abrir_locacao_aluguel": return { "contrato_numero": "LOC-TESTE-123", "placa": arguments.get("placa") or "RAA1A01", "modelo_veiculo": "Chevrolet Tracker", "categoria": "suv", "data_inicio": arguments["data_inicio"], "data_fim_prevista": arguments["data_fim_prevista"], "valor_diaria": 219.9, "valor_previsto": 659.7, "status": "ativa", "status_veiculo": "alugado", "cpf": arguments.get("cpf"), } if tool_name == "listar_pedidos": return [ { "numero_pedido": "PED-TESTE-001", "modelo_veiculo": "Fiat Argo 2020", "valor_veiculo": 61857.0, "status": "Ativo", }, { "numero_pedido": "PED-TESTE-002", "modelo_veiculo": "Toyota Corolla 2020", "valor_veiculo": 58476.0, "status": "Cancelado", }, ] if tool_name == "realizar_pedido": vehicle_map = { 1: ("Honda Civic 2021", 51524.0), 2: ("Toyota Corolla 2020", 58476.0), 3: ("Chevrolet Onix 2022", 51809.0), 7: ("Fiat Argo 2020", 61857.0), 9: ("Hyundai HB20S 2022", 76000.0), } modelo_veiculo, valor_veiculo = vehicle_map[arguments["vehicle_id"]] return { "numero_pedido": "PED-TESTE-123", "status": "Ativo", "modelo_veiculo": modelo_veiculo, "valor_veiculo": valor_veiculo, } if tool_name == "agendar_revisao": return { "protocolo": "REV-TESTE-123", "placa": arguments["placa"], "data_hora": arguments["data_hora"], "valor_revisao": 840.60, } if tool_name == "listar_agendamentos_revisao": return [ { "protocolo": "REV-TESTE-001", "placa": "ABC1234", "data_hora": "13/03/2026 16:00", "status": "Agendado", } ] if tool_name == "cancelar_agendamento_revisao": return { "protocolo": arguments["protocolo"], "placa": "ABC1269", "data_hora": "13/03/2026 16:00", "status": "Cancelado", } if tool_name == "editar_data_revisao": return { "protocolo": arguments["protocolo"], "placa": "ABC1269", "data_hora": arguments["nova_data_hora"], "status": "Remarcado", } return { "numero_pedido": arguments["numero_pedido"], "status": "Cancelado", "motivo": arguments["motivo"], } def coerce_http_error(self, exc): detail = exc.detail if isinstance(exc.detail, dict) else {"message": str(exc.detail)} return { "code": detail.get("code", "tool_error"), "message": detail.get("message", str(exc.detail)), "retryable": bool(detail.get("retryable", False)), "field": detail.get("field"), } class OrderFlowHarness(OrderFlowMixin): def __init__(self, state, registry): self.state = state self.registry = registry self.tool_executor = registry self.normalizer = EntityNormalizer() def _get_user_context(self, user_id: int | None): return self.state.get_user_context(user_id) def _save_user_context(self, user_id: int | None, context: dict | None) -> None: if user_id is None or not isinstance(context, dict): return self.state.save_user_context(user_id, context) def _normalize_intents(self, data) -> dict: return self.normalizer.normalize_intents(data) def _normalize_cancel_order_fields(self, data) -> dict: return self.normalizer.normalize_cancel_order_fields(data) def _normalize_order_fields(self, data) -> dict: return self.normalizer.normalize_order_fields(data) def _normalize_text(self, text: str) -> str: return self.normalizer.normalize_text(text) def _is_affirmative_message(self, text: str) -> bool: normalized = self.normalizer.normalize_text(text).strip().rstrip(".!?,;:") return normalized in {"sim", "pode", "ok", "confirmo", "aceito", "fechado", "pode sim", "continuar"} def _is_negative_message(self, text: str) -> bool: normalized = self.normalizer.normalize_text(text).strip().rstrip(".!?,;:") return normalized in {"nao", "n\u00e3o", "negativo", "cancelar", "outro", "outra opcao", "outra op\u00e7\u00e3o"} def _normalize_positive_number(self, value): return self.normalizer.normalize_positive_number(value) def _http_exception_detail(self, exc) -> str: return str(exc) def _fallback_format_tool_result(self, tool_name: str, tool_result) -> str: if tool_name == "consultar_estoque": lines = [f"Encontrei {len(tool_result)} veiculo(s):"] for idx, item in enumerate(tool_result, start=1): lines.append(f"{idx}. {item['modelo']} ({item['categoria']}) - R$ {item['preco']:.2f}") lines.append("Para escolher, responda com o numero da opcao desejada. Exemplo: 1.") return "\n".join(lines) if tool_name == "realizar_pedido": return ( f"Pedido criado com sucesso.\n" f"Numero: {tool_result['numero_pedido']}\n" f"Veiculo: {tool_result['modelo_veiculo']}\n" f"Valor: R$ {tool_result['valor_veiculo']:.2f}" ) if tool_name == "listar_pedidos": lines = [f"Encontrei {len(tool_result)} pedido(s):"] for idx, item in enumerate(tool_result, start=1): lines.append( f"{idx}. {item['numero_pedido']} | {item['modelo_veiculo']} | " f"{item['status']} | R$ {item['valor_veiculo']:.2f}" ) return "\n".join(lines) return ( f"Pedido {tool_result['numero_pedido']} atualizado.\n" f"Status: {tool_result['status']}\n" f"Motivo: {tool_result['motivo']}" ) def _try_prefill_order_cpf_from_user_profile(self, user_id: int | None, payload: dict) -> None: return None def _load_vehicle_by_id(self, vehicle_id: int) -> dict | None: for context in self.state.contexts.values(): for item in context.get("last_stock_results", []): if int(item["id"]) == int(vehicle_id): return dict(item) return None class RentalFlowHarness(RentalFlowMixin): def __init__(self, state, registry, rental_now_provider=None): self.state = state self.registry = registry self.tool_executor = registry self.normalizer = EntityNormalizer() self._rental_now_provider = rental_now_provider def _get_user_context(self, user_id: int | None): return self.state.get_user_context(user_id) def _save_user_context(self, user_id: int | None, context: dict | None) -> None: if user_id is None or not isinstance(context, dict): return self.state.save_user_context(user_id, context) def _normalize_text(self, text: str) -> str: return self.normalizer.normalize_text(text) def _decision_intent(self, turn_decision: dict | None) -> str: return str((turn_decision or {}).get("intent") or "").strip().lower() def _http_exception_detail(self, exc) -> str: return str(exc) def _reset_pending_rental_states(self, user_id: int | None) -> None: if user_id is None: return self.state.pop_entry("pending_rental_drafts", user_id) self.state.pop_entry("pending_rental_selections", user_id) context = self._get_user_context(user_id) if isinstance(context, dict): context["last_rental_results"] = [] context["selected_rental_vehicle"] = None if context.get("active_task") == "rental_create": context["active_task"] = None if str(context.get("active_domain") or "").strip().lower() == "rental": context["active_domain"] = "general" self._save_user_context(user_id=user_id, context=context) def _fallback_format_tool_result(self, tool_name: str, tool_result) -> str: if tool_name == "consultar_frota_aluguel": lines = [f"Encontrei {len(tool_result)} veiculo(s) para locacao:"] for idx, item in enumerate(tool_result, start=1): lines.append( f"{idx}. {item['modelo']} {item['ano']} | {item['placa']} | " f"{item['categoria']} | diaria R$ {item['valor_diaria']:.2f}" ) lines.append("Para seguir com a locacao, informe a placa ou o numero da opcao desejada.") return "\n".join(lines) if tool_name == "abrir_locacao_aluguel": return ( "Locacao aberta com sucesso.\n" f"Contrato: {tool_result['contrato_numero']}\n" f"Placa: {tool_result['placa']}\n" f"Inicio: {tool_result['data_inicio']}\n" f"Devolucao prevista: {tool_result['data_fim_prevista']}" ) return str(tool_result) class ReviewFlowHarness(ReviewFlowMixin): def __init__(self, state, registry, review_now_provider=None): self.state = state self.registry = registry self.tool_executor = registry self.normalizer = EntityNormalizer() self.captured_suggestions = [] self.logged_events = [] self._review_now_provider = review_now_provider def _normalize_intents(self, data) -> dict: return self.normalizer.normalize_intents(data) def _normalize_review_fields(self, data) -> dict: return self.normalizer.normalize_review_fields(data) def _normalize_review_management_fields(self, data) -> dict: return self.normalizer.normalize_review_management_fields(data) def _normalize_text(self, text: str) -> str: return self.normalizer.normalize_text(text) def _normalize_review_datetime_text(self, value) -> str | None: return self.normalizer.normalize_review_datetime_text(value) def _http_exception_detail(self, exc) -> str: detail = exc.detail if isinstance(exc.detail, dict) else {} return str(detail.get("message") or exc) def _get_user_context(self, user_id: int | None): return self.state.get_user_context(user_id) def _save_user_context(self, user_id: int | None, context: dict | None) -> None: if user_id is None or not isinstance(context, dict): return self.state.save_user_context(user_id, context) def _fallback_format_tool_result(self, tool_name: str, tool_result) -> str: return f"{tool_name}:{tool_result}" def _extract_review_protocol_from_text(self, text: str) -> str | None: return self.normalizer.extract_review_protocol_from_text(text) def _is_affirmative_message(self, text: str) -> bool: normalized = self.normalizer.normalize_text(text).strip().rstrip(".!?,;:") return normalized in {"sim", "pode", "ok", "confirmo", "aceito", "fechado", "pode sim", "tenho", "tenho sim"} def _is_negative_message(self, text: str) -> bool: normalized = self.normalizer.normalize_text(text).strip().rstrip(".!?,;:") return normalized in {"nao", "nao quero", "prefiro outro", "outro horario"} or normalized.startswith("nao") def _capture_review_confirmation_suggestion(self, **kwargs) -> None: self.captured_suggestions.append(kwargs) def _try_prefill_review_fields_from_memory(self, user_id: int | None, payload: dict) -> None: return None def _log_turn_event(self, event: str, **payload) -> None: self.logged_events.append((event, payload)) def _reset_pending_review_states(self, user_id: int | None) -> None: self.state.pop_entry("pending_review_drafts", user_id) self.state.pop_entry("pending_review_management_drafts", user_id) self.state.pop_entry("pending_review_confirmations", user_id) self.state.pop_entry("pending_review_reuse_confirmations", user_id) class ConversationAdjustmentsTests(unittest.TestCase): def test_telegram_satellite_requires_redis_in_production(self): with patch("app.integrations.telegram_satellite_service.settings.environment", "production"), patch( "app.integrations.telegram_satellite_service.settings.conversation_state_backend", "memory", ): with self.assertRaises(RuntimeError): _ensure_supported_runtime_configuration() def test_telegram_satellite_allows_redis_in_production(self): with patch("app.integrations.telegram_satellite_service.settings.environment", "production"), patch( "app.integrations.telegram_satellite_service.settings.conversation_state_backend", "redis", ): _ensure_supported_runtime_configuration() def test_telegram_satellite_splits_long_messages_safely(self): text = ("A" * 3900) + "\n" + ("B" * 3900) + "\n" + ("C" * 3900) chunks = _split_telegram_text(text) self.assertGreater(len(chunks), 1) self.assertTrue(all(len(chunk) <= 3800 for chunk in chunks)) rebuilt = "\n".join(chunks) self.assertEqual(rebuilt.replace("\n", ""), text.replace("\n", "")) self.assertIn("A" * 100, rebuilt) self.assertIn("B" * 100, rebuilt) self.assertIn("C" * 100, rebuilt) def test_review_listing_formatter_is_compact_and_complete(self): response = fallback_format_tool_result( "listar_agendamentos_revisao", [ { "protocolo": "REV-1", "placa": "ABC1234", "data_hora": "2026-03-19T09:30:00", "status": "agendado", }, { "protocolo": "REV-2", "placa": "XYZ9999", "data_hora": "2026-03-20T10:00:00", "status": "cancelado", }, ], ) self.assertIn("Voce tem 2 agendamento(s):", response) self.assertIn("1. REV-1 | ABC1234 |", response) self.assertIn("2. REV-2 | XYZ9999 |", response) self.assertNotIn("\n\n", response) def test_rental_open_formatter_marks_payment_as_pending(self): response = fallback_format_tool_result( "abrir_locacao_aluguel", { "contrato_numero": "LOC-20260318-FE69BCF0", "placa": "RAA1A12", "modelo_veiculo": "Peugeot 208", "data_inicio": "2026-03-20T10:00:00", "data_fim_prevista": "2026-03-23T10:00:00", "valor_diaria": 149.9, "valor_previsto": 449.7, }, ) self.assertIn("Pagamento: em aberto", response) self.assertIn("testar o comprovante", response) def test_defer_flow_cancel_when_order_cancel_draft_waits_for_reason(self): state = FakeState( entries={ "pending_cancel_order_drafts": { 7: { "payload": {"numero_pedido": "PED-20260305120000-ABC123"}, "expires_at": utc_now() + timedelta(minutes=30), } } } ) policy = ConversationPolicy(service=FakeService(state)) self.assertTrue(policy.should_defer_flow_cancellation_control("desisti", user_id=7)) self.assertFalse(policy.should_defer_flow_cancellation_control("cancelar fluxo atual", user_id=7)) def test_defer_flow_cancel_when_review_reuse_confirmation_is_pending(self): state = FakeState( entries={ "pending_review_reuse_confirmations": { 7: { "payload": { "placa": "ABC1234", "modelo": "Corolla", "ano": 2020, "km": 30000, "revisao_previa_concessionaria": True, }, "expires_at": utc_now() + timedelta(minutes=30), } } } ) policy = ConversationPolicy(service=FakeService(state)) self.assertTrue(policy.should_defer_flow_cancellation_control("nao", user_id=7)) self.assertFalse(policy.should_defer_flow_cancellation_control("cancelar fluxo atual", user_id=7)) def test_defer_flow_cancel_for_fresh_sales_request_without_open_flow(self): state = FakeState( contexts={ 7: { "active_domain": "sales", "pending_order_selection": None, "pending_switch": None, } } ) policy = ConversationPolicy(service=FakeService(state)) self.assertTrue(policy.should_defer_flow_cancellation_control("agora eu quero comprar um carro de ate 70 mil", user_id=7)) self.assertFalse(policy.should_defer_flow_cancellation_control("cancelar fluxo atual", user_id=7)) def test_normalize_datetime_connector_accepts_as_com_acento(self): normalizer = EntityNormalizer() self.assertEqual( normalizer.normalize_datetime_connector("10/03/2026 \u00e0s 09:00"), "10/03/2026 09:00", ) def test_parse_review_datetime_accepts_as_com_acento(self): parsed = _parse_data_hora_revisao("10/03/2026 \u00e0s 09:00") self.assertEqual(parsed, datetime(2026, 3, 10, 9, 0)) def test_normalize_review_datetime_extracts_datetime_from_long_review_sentence(self): normalizer = EntityNormalizer() self.assertEqual( normalizer.normalize_review_datetime_text( "para ABC1234 em 28/03/2026 as 8:00, Corolla, 2020, 30000 km, ja fiz revisao" ), "28/03/2026 08:00", ) def test_normalize_review_fields_discards_invalid_datetime_noise(self): normalizer = EntityNormalizer() self.assertEqual( normalizer.normalize_review_fields({"data_hora": "quero agendar uma revisao qualquer"}), {}, ) def test_reset_message_variants_strip_previous_context_prefix(self): state = FakeState() policy = ConversationPolicy(service=FakeService(state)) message = "Esque\u00e7a as opera\u00e7\u00f5es anteriores, agora quero agendar revis\u00e3o para ABC1234" cleaned = policy.remove_order_selection_reset_prefix(message) self.assertTrue(policy.is_order_selection_reset_message(message)) self.assertEqual(cleaned, "quero agendar revis\u00e3o para ABC1234") class CancelOrderFlowTests(unittest.IsolatedAsyncioTestCase): async def test_cancel_order_flow_accepts_turn_decision_without_legacy_intents(self): state = FakeState() registry = FakeRegistry() flow = OrderFlowHarness(state=state, registry=registry) response = await flow._try_collect_and_cancel_order( message="cancelar pedido PED-20260305120000-ABC123", user_id=42, extracted_fields={"numero_pedido": "PED-20260305120000-ABC123"}, intents={}, turn_decision={"intent": "order_cancel", "domain": "sales", "action": "collect_order_cancel"}, ) self.assertEqual(response, "Encontrei o pedido informado. Qual o motivo do cancelamento?") self.assertIsNotNone(state.get_entry("pending_cancel_order_drafts", 42)) async def test_cancel_order_flow_consumes_free_text_reason(self): state = FakeState( entries={ "pending_cancel_order_drafts": { 42: { "payload": {"numero_pedido": "PED-20260305120000-ABC123"}, "expires_at": utc_now() + timedelta(minutes=30), } } } ) registry = FakeRegistry() flow = OrderFlowHarness(state=state, registry=registry) response = await flow._try_collect_and_cancel_order( message="desisti", user_id=42, extracted_fields={}, intents={}, ) self.assertEqual(len(registry.calls), 1) tool_name, arguments, tool_user_id = registry.calls[0] self.assertEqual(tool_name, "cancelar_pedido") self.assertEqual(tool_user_id, 42) self.assertEqual(arguments["numero_pedido"], "PED-20260305120000-ABC123") self.assertEqual(arguments["motivo"], "desisti") self.assertIn("Status: Cancelado", response) self.assertIsNone(state.get_entry("pending_cancel_order_drafts", 42)) async def test_cancel_order_flow_consumes_free_text_reason_even_when_model_repeats_order_cancel_intent(self): state = FakeState( entries={ "pending_cancel_order_drafts": { 42: { "payload": {"numero_pedido": "PED-20260305120000-ABC123"}, "expires_at": utc_now() + timedelta(minutes=30), } } } ) registry = FakeRegistry() flow = OrderFlowHarness(state=state, registry=registry) response = await flow._try_collect_and_cancel_order( message="Eu desisti dessa compra", user_id=42, extracted_fields={}, intents={}, turn_decision={"intent": "order_cancel", "domain": "sales", "action": "answer_user"}, ) self.assertEqual(len(registry.calls), 1) tool_name, arguments, tool_user_id = registry.calls[0] self.assertEqual(tool_name, "cancelar_pedido") self.assertEqual(tool_user_id, 42) self.assertEqual(arguments["numero_pedido"], "PED-20260305120000-ABC123") self.assertEqual(arguments["motivo"], "Eu desisti dessa compra") self.assertIn("Pedido PED-20260305120000-ABC123 atualizado.", response) self.assertIn("Status: Cancelado", response) self.assertIsNone(state.get_entry("pending_cancel_order_drafts", 42)) async def test_cancel_order_flow_still_requests_reason_when_message_is_too_short(self): state = FakeState( entries={ "pending_cancel_order_drafts": { 42: { "payload": {"numero_pedido": "PED-20260305120000-ABC123"}, "expires_at": utc_now() + timedelta(minutes=30), } } } ) registry = FakeRegistry() flow = OrderFlowHarness(state=state, registry=registry) response = await flow._try_collect_and_cancel_order( message="ok", user_id=42, extracted_fields={}, intents={}, ) self.assertEqual(registry.calls, []) self.assertIn("o motivo do cancelamento", response) self.assertIsNotNone(state.get_entry("pending_cancel_order_drafts", 42)) async def test_cancel_order_flow_does_not_override_active_order_creation_draft(self): state = FakeState( entries={ "pending_order_drafts": { 42: { "payload": {}, "expires_at": utc_now() + timedelta(minutes=30), } } } ) registry = FakeRegistry() flow = OrderFlowHarness(state=state, registry=registry) response = await flow._try_collect_and_cancel_order( message="2", user_id=42, extracted_fields={}, intents={"order_cancel": True}, ) self.assertIsNone(response) self.assertEqual(registry.calls, []) async def test_cancel_order_flow_restores_draft_from_context_snapshot_when_bucket_is_missing(self): state = FakeState( contexts={ 42: { "active_domain": "sales", "active_task": "order_cancel", "generic_memory": {}, "shared_memory": {}, "collected_slots": { "order_cancel": {"numero_pedido": "PED-20260305120000-ABC123"}, }, "flow_snapshots": { "order_cancel": { "payload": {"numero_pedido": "PED-20260305120000-ABC123"}, "expires_at": utc_now() + timedelta(minutes=30), } }, "last_stock_results": [], "selected_vehicle": None, } } ) registry = FakeRegistry() flow = OrderFlowHarness(state=state, registry=registry) response = await flow._try_collect_and_cancel_order( message="desisti da compra", user_id=42, extracted_fields={}, intents={}, ) self.assertEqual(len(registry.calls), 1) tool_name, arguments, tool_user_id = registry.calls[0] self.assertEqual(tool_name, "cancelar_pedido") self.assertEqual(tool_user_id, 42) self.assertEqual(arguments["numero_pedido"], "PED-20260305120000-ABC123") self.assertEqual(arguments["motivo"], "desisti da compra") self.assertIn("Status: Cancelado", response) class CreateOrderFlowWithVehicleTests(unittest.IsolatedAsyncioTestCase): async def test_order_listing_preserves_open_order_draft(self): state = FakeState( entries={ "pending_order_drafts": { 10: { "payload": {"cpf": "12345678909"}, "expires_at": utc_now() + timedelta(minutes=30), } } } ) registry = FakeRegistry() flow = OrderFlowHarness(state=state, registry=registry) response = await flow._try_handle_order_listing( message="Liste os meus pedidos", user_id=10, intents={}, turn_decision={"intent": "order_list", "domain": "sales", "action": "call_tool"}, ) self.assertEqual(registry.calls[0][0], "listar_pedidos") self.assertEqual(registry.calls[0][1]["limite"], 50) self.assertIn("Encontrei 2 pedido(s):", response) self.assertIsNotNone(state.get_entry("pending_order_drafts", 10)) async def test_order_listing_ignores_review_appointment_listing_message(self): state = FakeState() registry = FakeRegistry() flow = OrderFlowHarness(state=state, registry=registry) response = await flow._try_handle_order_listing( message="liste para mim os meus agendamentos de revisao", user_id=10, intents={}, turn_decision={"intent": "order_list", "domain": "sales", "action": "call_tool"}, ) self.assertIsNone(response) self.assertEqual(registry.calls, []) async def test_order_flow_returns_clear_guidance_for_invalid_cpf_follow_up(self): state = FakeState( entries={ "pending_order_drafts": { 10: { "payload": {"vehicle_id": 1, "modelo_veiculo": "Honda Civic 2021", "valor_veiculo": 48500.0}, "expires_at": utc_now() + timedelta(minutes=30), } } }, contexts={ 10: { "generic_memory": {}, "shared_memory": {}, "last_stock_results": [ {"id": 1, "modelo": "Honda Civic 2021", "categoria": "sedan", "preco": 48500.0} ], "selected_vehicle": {"id": 1, "modelo": "Honda Civic 2021", "categoria": "sedan", "preco": 48500.0}, } } ) registry = FakeRegistry() flow = OrderFlowHarness(state=state, registry=registry) response = await flow._try_collect_and_create_order( message="123", user_id=10, extracted_fields={}, intents={}, turn_decision={"intent": "order_create", "domain": "sales", "action": "collect_order_create"}, ) self.assertEqual(response, "Para seguir com o pedido, preciso de um CPF valido. Pode me informar novamente?") self.assertEqual(registry.calls, []) self.assertIsNotNone(state.get_entry("pending_order_drafts", 10)) async def test_order_flow_rejects_cpf_linked_to_other_user_without_resetting_vehicle_choice(self): state = FakeState( entries={ "pending_order_drafts": { 10: { "payload": {"vehicle_id": 1}, "expires_at": utc_now() + timedelta(minutes=30), } } }, contexts={ 10: { "generic_memory": {}, "shared_memory": {}, "last_stock_results": [ {"id": 1, "modelo": "Honda Civic 2021", "categoria": "sedan", "preco": 48500.0} ], "selected_vehicle": {"id": 1, "modelo": "Honda Civic 2021", "categoria": "sedan", "preco": 48500.0}, } } ) registry = FakeRegistry() flow = OrderFlowHarness(state=state, registry=registry) async def fake_hydrate_mock_customer_from_cpf(cpf: str, user_id: int | None = None): raise ValueError("cpf_already_linked") with patch( "app.services.flows.order_flow.hydrate_mock_customer_from_cpf", new=fake_hydrate_mock_customer_from_cpf, ): response = await flow._try_collect_and_create_order( message="12345678909", user_id=10, extracted_fields={}, intents={}, turn_decision={"intent": "order_create", "domain": "sales", "action": "collect_order_create"}, ) self.assertEqual(response, "Este CPF ja esta vinculado a outro usuario. Pode me informar um CPF diferente?") self.assertEqual(registry.calls, []) self.assertIsNotNone(state.get_entry("pending_order_drafts", 10)) self.assertEqual(state.get_user_context(10)["selected_vehicle"]["id"], 1) async def test_order_flow_auto_lists_stock_on_first_purchase_message_when_budget_exists(self): state = FakeState( contexts={ 10: { "generic_memory": {"cpf": "12345678909", "orcamento_max": 50000}, "last_stock_results": [], "selected_vehicle": None, } } ) registry = FakeRegistry() flow = OrderFlowHarness(state=state, registry=registry) async def fake_hydrate_mock_customer_from_cpf(cpf: str, user_id: int | None = None): return {"cpf": cpf, "user_id": user_id} with patch( "app.services.flows.order_flow.hydrate_mock_customer_from_cpf", new=fake_hydrate_mock_customer_from_cpf, ): response = await flow._try_collect_and_create_order( message="Quero comprar um carro de 50 mil, meu CPF e 12345678909", user_id=10, extracted_fields={"cpf": "12345678909"}, intents={}, turn_decision={"intent": "order_create", "domain": "sales", "action": "collect_order_create"}, ) self.assertIn("Encontrei 2 veiculo(s):", response) self.assertIn("Honda Civic 2021", response) async def test_order_flow_budget_search_prioritizes_matches_closest_to_ceiling(self): state = FakeState( contexts={ 10: { "generic_memory": {"cpf": "12345678909", "orcamento_max": 50000}, "last_stock_results": [], "selected_vehicle": None, } } ) registry = FakeRegistry() flow = OrderFlowHarness(state=state, registry=registry) async def fake_hydrate_mock_customer_from_cpf(cpf: str, user_id: int | None = None): return {"cpf": cpf, "user_id": user_id} with patch( "app.services.flows.order_flow.hydrate_mock_customer_from_cpf", new=fake_hydrate_mock_customer_from_cpf, ): response = await flow._try_collect_and_create_order( message="Quero comprar um carro de ate 50 mil, meu CPF e 12345678909", user_id=10, extracted_fields={"cpf": "12345678909"}, intents={}, turn_decision={"intent": "order_create", "domain": "sales", "action": "collect_order_create"}, ) self.assertEqual(registry.calls[0][1]["ordenar_preco"], "desc") self.assertIn("1. Toyota Yaris 2020 (hatch) - R$ 49900.00", response) self.assertIn("2. Honda Civic 2021 (sedan) - R$ 48500.00", response) self.assertEqual(flow._get_last_stock_results(user_id=10)[0]["modelo"], "Toyota Yaris 2020") async def test_order_flow_budget_search_keeps_cheapest_first_when_user_asks_for_cheapest(self): state = FakeState( contexts={ 10: { "generic_memory": {"cpf": "12345678909", "orcamento_max": 50000}, "last_stock_results": [], "selected_vehicle": None, } } ) registry = FakeRegistry() flow = OrderFlowHarness(state=state, registry=registry) async def fake_hydrate_mock_customer_from_cpf(cpf: str, user_id: int | None = None): return {"cpf": cpf, "user_id": user_id} with patch( "app.services.flows.order_flow.hydrate_mock_customer_from_cpf", new=fake_hydrate_mock_customer_from_cpf, ): response = await flow._try_collect_and_create_order( message="Quero comprar o carro mais barato ate 50 mil, meu CPF e 12345678909", user_id=10, extracted_fields={"cpf": "12345678909"}, intents={}, turn_decision={"intent": "order_create", "domain": "sales", "action": "collect_order_create"}, ) self.assertEqual(registry.calls[0][1]["ordenar_preco"], "asc") self.assertIn("1. Honda Civic 2021 (sedan) - R$ 48500.00", response) self.assertIn("2. Toyota Yaris 2020 (hatch) - R$ 49900.00", response) self.assertEqual(flow._get_last_stock_results(user_id=10)[0]["modelo"], "Honda Civic 2021") async def test_order_flow_extracts_budget_from_message_when_llm_misses_it(self): state = FakeState( contexts={ 10: { "generic_memory": {}, "shared_memory": {}, "last_stock_results": [], "selected_vehicle": None, } } ) registry = FakeRegistry() flow = OrderFlowHarness(state=state, registry=registry) async def fake_hydrate_mock_customer_from_cpf(cpf: str, user_id: int | None = None): return {"cpf": cpf, "user_id": user_id} with patch( "app.services.flows.order_flow.hydrate_mock_customer_from_cpf", new=fake_hydrate_mock_customer_from_cpf, ): response = await flow._try_collect_and_create_order( message="Quero comprar um carro de 50 mil, meu CPF e 12345678909", user_id=10, extracted_fields={}, intents={}, turn_decision={"intent": "order_create", "domain": "sales", "action": "collect_order_create"}, ) self.assertIn("Encontrei 2 veiculo(s):", response) self.assertEqual(state.get_user_context(10)["generic_memory"]["orcamento_max"], 50000) async def test_order_flow_extracts_cpf_from_followup_message_when_llm_misses_it(self): state = FakeState( entries={ "pending_order_drafts": { 10: { "payload": {}, "expires_at": utc_now() + timedelta(minutes=30), } } }, contexts={ 10: { "generic_memory": {"orcamento_max": 50000}, "shared_memory": {"orcamento_max": 50000}, "last_stock_results": [], "selected_vehicle": None, } }, ) registry = FakeRegistry() flow = OrderFlowHarness(state=state, registry=registry) async def fake_hydrate_mock_customer_from_cpf(cpf: str, user_id: int | None = None): return {"cpf": cpf, "user_id": user_id} with patch( "app.services.flows.order_flow.hydrate_mock_customer_from_cpf", new=fake_hydrate_mock_customer_from_cpf, ): response = await flow._try_collect_and_create_order( message="Meu CPF e 12345678909", user_id=10, extracted_fields={}, intents={}, ) self.assertIn("Encontrei 2 veiculo(s):", response) self.assertEqual(registry.calls[0][0], "consultar_estoque") async def test_order_flow_lists_stock_from_budget_when_vehicle_is_missing(self): state = FakeState( entries={ "pending_order_drafts": { 10: { "payload": {"cpf": "12345678909"}, "expires_at": utc_now() + timedelta(minutes=30), } } }, contexts={ 10: { "generic_memory": {"cpf": "12345678909", "orcamento_max": 50000}, "last_stock_results": [], "selected_vehicle": None, } }, ) registry = FakeRegistry() flow = OrderFlowHarness(state=state, registry=registry) async def fake_hydrate_mock_customer_from_cpf(cpf: str, user_id: int | None = None): return {"cpf": cpf, "user_id": user_id} with patch( "app.services.flows.order_flow.hydrate_mock_customer_from_cpf", new=fake_hydrate_mock_customer_from_cpf, ): response = await flow._try_collect_and_create_order( message="liste os carros com esse valor em estoque", user_id=10, extracted_fields={}, intents={}, turn_decision={"intent": "inventory_search", "domain": "sales", "action": "call_tool"}, ) self.assertEqual(registry.calls[0][0], "consultar_estoque") self.assertIn("Encontrei 2 veiculo(s):", response) self.assertIn("Honda Civic 2021", response) self.assertEqual(len(flow._get_last_stock_results(user_id=10)), 2) async def test_order_flow_restarts_open_draft_when_user_requests_new_budget_search(self): state = FakeState( entries={ "pending_order_drafts": { 10: { "payload": { "cpf": "12345678909", "vehicle_id": 15, "modelo_veiculo": "Volkswagen T-Cross 2022", "valor_veiculo": 73224.0, }, "expires_at": utc_now() + timedelta(minutes=30), } } }, contexts={ 10: { "active_domain": "sales", "generic_memory": {"cpf": "12345678909", "orcamento_max": 70000, "perfil_veiculo": ["suv"]}, "shared_memory": {"cpf": "12345678909", "orcamento_max": 70000, "perfil_veiculo": ["suv"]}, "last_stock_results": [ {"id": 15, "modelo": "Volkswagen T-Cross 2022", "categoria": "suv", "preco": 73224.0, "budget_relaxed": True}, ], "selected_vehicle": {"id": 15, "modelo": "Volkswagen T-Cross 2022", "categoria": "suv", "preco": 73224.0, "budget_relaxed": True}, } }, ) registry = FakeRegistry() flow = OrderFlowHarness(state=state, registry=registry) async def fake_hydrate_mock_customer_from_cpf(cpf: str, user_id: int | None = None): return {"cpf": cpf, "user_id": user_id} with patch( "app.services.flows.order_flow.hydrate_mock_customer_from_cpf", new=fake_hydrate_mock_customer_from_cpf, ): response = await flow._try_collect_and_create_order( message="agora quero comprar um carro de ate 60 mil", user_id=10, extracted_fields={}, intents={}, turn_decision={"intent": "order_create", "domain": "sales", "action": "collect_order_create"}, ) self.assertEqual(registry.calls[0][0], "consultar_estoque") self.assertIn("Encontrei 2 veiculo(s):", response) self.assertEqual(state.get_user_context(10)["generic_memory"]["orcamento_max"], 60000) self.assertIsNone(state.get_user_context(10)["selected_vehicle"]) async def test_order_flow_accepts_turn_decision_without_legacy_intents(self): state = FakeState( entries={ "pending_order_drafts": { 10: { "payload": {"cpf": "12345678909"}, "expires_at": utc_now() + timedelta(minutes=30), } } }, contexts={ 10: { "generic_memory": {}, "last_stock_results": [ {"id": 1, "modelo": "Honda Civic 2021", "categoria": "sedan", "preco": 51524.0}, ], "selected_vehicle": None, } } ) registry = FakeRegistry() flow = OrderFlowHarness(state=state, registry=registry) async def fake_hydrate_mock_customer_from_cpf(cpf: str, user_id: int | None = None): return {"cpf": cpf, "user_id": user_id} with patch( "app.services.flows.order_flow.hydrate_mock_customer_from_cpf", new=fake_hydrate_mock_customer_from_cpf, ): response = await flow._try_collect_and_create_order( message="esse", user_id=10, extracted_fields={"vehicle_id": 1}, intents={}, turn_decision={"intent": "order_create", "domain": "sales", "action": "collect_order_create"}, ) self.assertEqual(len(registry.calls), 1) tool_name, arguments, tool_user_id = registry.calls[0] self.assertEqual(tool_name, "realizar_pedido") self.assertEqual(tool_user_id, 10) self.assertEqual(arguments["vehicle_id"], 1) self.assertEqual(arguments["cpf"], "12345678909") self.assertIn("Pedido criado com sucesso.", response) async def test_order_flow_accepts_model_intent_without_keyword_trigger(self): state = FakeState( entries={ "pending_order_drafts": { 10: { "payload": {"cpf": "12345678909"}, "expires_at": utc_now() + timedelta(minutes=30), } } }, contexts={ 10: { "generic_memory": {}, "last_stock_results": [ {"id": 1, "modelo": "Honda Civic 2021", "categoria": "sedan", "preco": 51524.0}, ], "selected_vehicle": None, } } ) registry = FakeRegistry() flow = OrderFlowHarness(state=state, registry=registry) async def fake_hydrate_mock_customer_from_cpf(cpf: str, user_id: int | None = None): return {"cpf": cpf, "user_id": user_id} with patch( "app.services.flows.order_flow.hydrate_mock_customer_from_cpf", new=fake_hydrate_mock_customer_from_cpf, ): response = await flow._try_collect_and_create_order( message="esse", user_id=10, extracted_fields={"vehicle_id": 1}, intents={"order_create": True}, ) self.assertEqual(len(registry.calls), 1) tool_name, arguments, tool_user_id = registry.calls[0] self.assertEqual(tool_name, "realizar_pedido") self.assertEqual(tool_user_id, 10) self.assertEqual(arguments["vehicle_id"], 1) self.assertEqual(arguments["cpf"], "12345678909") self.assertIn("Pedido criado com sucesso.", response) async def test_order_flow_requests_vehicle_selection_from_last_stock_results(self): state = FakeState( contexts={ 10: { "generic_memory": {}, "last_stock_results": [ {"id": 1, "modelo": "Honda Civic 2021", "categoria": "sedan", "preco": 51524.0}, {"id": 2, "modelo": "Toyota Corolla 2020", "categoria": "hatch", "preco": 58476.0}, ], "selected_vehicle": None, } } ) registry = FakeRegistry() flow = OrderFlowHarness(state=state, registry=registry) response = await flow._try_collect_and_create_order( message="Quero fazer um pedido", user_id=10, extracted_fields={}, intents={"order_create": True}, ) self.assertEqual( response, "Pode me dizer a faixa de preco, o modelo ou o tipo de carro que voce procura.", ) self.assertEqual(registry.calls, []) self.assertEqual(state.get_user_context(10)["last_stock_results"], []) async def test_order_flow_generic_request_asks_for_price_range_even_with_previous_search_context(self): state = FakeState( contexts={ 10: { "generic_memory": {"cpf": "12345678909", "orcamento_max": 70000, "perfil_veiculo": ["suv"]}, "shared_memory": {"orcamento_max": 70000, "perfil_veiculo": ["suv"]}, "last_stock_results": [ {"id": 7, "modelo": "Fiat Argo 2020", "categoria": "suv", "preco": 61857.0}, {"id": 3, "modelo": "Chevrolet Onix 2022", "categoria": "suv", "preco": 51809.0}, ], "selected_vehicle": {"id": 7, "modelo": "Fiat Argo 2020", "categoria": "suv", "preco": 61857.0}, "pending_single_vehicle_confirmation": {"id": 7, "modelo": "Fiat Argo 2020", "categoria": "suv", "preco": 61857.0}, } } ) registry = FakeRegistry() flow = OrderFlowHarness(state=state, registry=registry) response = await flow._try_collect_and_create_order( message="Quero fazer um pedido de veiculo", user_id=10, extracted_fields={}, intents={"order_create": True}, turn_decision={"intent": "order_create", "domain": "sales", "action": "collect_order_create"}, ) draft = state.get_entry("pending_order_drafts", 10) context = state.get_user_context(10) self.assertEqual( response, "Pode me dizer a faixa de preco, o modelo ou o tipo de carro que voce procura.", ) self.assertEqual(registry.calls, []) self.assertIsNotNone(draft) self.assertEqual(draft["payload"], {}) self.assertEqual(context["last_stock_results"], []) self.assertIsNone(context["selected_vehicle"]) self.assertIsNone(context.get("pending_single_vehicle_confirmation")) self.assertNotIn("orcamento_max", context["generic_memory"]) self.assertNotIn("perfil_veiculo", context["generic_memory"]) self.assertNotIn("orcamento_max", context["shared_memory"]) self.assertNotIn("perfil_veiculo", context["shared_memory"]) async def test_order_flow_requires_confirmation_before_using_known_cpf(self): state = FakeState( contexts={ 10: { "generic_memory": {"cpf": "12345678909"}, "last_stock_results": [ {"id": 1, "modelo": "Honda Civic 2021", "categoria": "sedan", "preco": 51524.0}, ], "selected_vehicle": None, } } ) registry = FakeRegistry() flow = OrderFlowHarness(state=state, registry=registry) hydrated = [] async def fake_hydrate_mock_customer_from_cpf(cpf: str, user_id: int | None = None): hydrated.append((cpf, user_id)) return {"cpf": cpf, "user_id": user_id} with patch( "app.services.flows.order_flow.hydrate_mock_customer_from_cpf", new=fake_hydrate_mock_customer_from_cpf, ): first_response = await flow._try_collect_and_create_order( message="1", user_id=10, extracted_fields={}, intents={}, ) draft = state.get_entry("pending_order_drafts", 10) self.assertIsNotNone(draft) self.assertEqual(draft["payload"].get("vehicle_id"), 1) self.assertEqual(draft["payload"].get("cpf"), "12345678909") self.assertIs(draft["payload"].get("cpf_confirmed"), False) self.assertIn("cpf informado anteriormente", first_response.lower()) self.assertIn("continua correto", first_response.lower()) self.assertEqual(registry.calls, []) self.assertEqual(hydrated, []) second_response = await flow._try_collect_and_create_order( message="sim", user_id=10, extracted_fields={}, intents={}, ) self.assertEqual(len(registry.calls), 1) self.assertEqual(registry.calls[0][0], "realizar_pedido") self.assertEqual(registry.calls[0][1]["vehicle_id"], 1) self.assertEqual(registry.calls[0][1]["cpf"], "12345678909") self.assertEqual(hydrated, [("12345678909", 10)]) self.assertIn("Pedido criado com sucesso.", second_response) async def test_order_flow_updates_known_cpf_after_negative_confirmation_and_new_value(self): state = FakeState( contexts={ 10: { "generic_memory": {"cpf": "12345678909"}, "last_stock_results": [ {"id": 1, "modelo": "Honda Civic 2021", "categoria": "sedan", "preco": 51524.0}, ], "selected_vehicle": None, } } ) registry = FakeRegistry() flow = OrderFlowHarness(state=state, registry=registry) hydrated = [] async def fake_hydrate_mock_customer_from_cpf(cpf: str, user_id: int | None = None): hydrated.append((cpf, user_id)) return {"cpf": cpf, "user_id": user_id} with patch( "app.services.flows.order_flow.hydrate_mock_customer_from_cpf", new=fake_hydrate_mock_customer_from_cpf, ): await flow._try_collect_and_create_order( message="1", user_id=10, extracted_fields={}, intents={}, ) second_response = await flow._try_collect_and_create_order( message="nao", user_id=10, extracted_fields={}, intents={}, ) third_response = await flow._try_collect_and_create_order( message="52998224725", user_id=10, extracted_fields={}, intents={}, ) self.assertIn("me informe o cpf correto", second_response.lower()) self.assertEqual(len(registry.calls), 1) self.assertEqual(registry.calls[0][0], "realizar_pedido") self.assertEqual(registry.calls[0][1]["vehicle_id"], 1) self.assertEqual(registry.calls[0][1]["cpf"], "52998224725") self.assertEqual(hydrated, [("52998224725", 10)]) self.assertEqual(state.get_user_context(10)["generic_memory"]["cpf"], "52998224725") self.assertEqual(state.get_user_context(10)["shared_memory"]["cpf"], "52998224725") self.assertIn("Pedido criado com sucesso.", third_response) async def test_order_flow_single_stock_result_requires_explicit_confirmation(self): state = FakeState( entries={ "pending_order_drafts": { 10: { "payload": {"cpf": "12345678909"}, "expires_at": utc_now() + timedelta(minutes=30), } } }, contexts={ 10: { "generic_memory": {"cpf": "12345678909", "orcamento_max": 70000}, "shared_memory": {"orcamento_max": 70000}, "last_stock_results": [], "selected_vehicle": None, } }, ) registry = FakeRegistry() async def single_result_execute(tool_name: str, arguments: dict, user_id: int | None = None): registry.calls.append((tool_name, arguments, user_id)) if tool_name == "consultar_estoque": return [{"id": 7, "modelo": "Fiat Argo 2020", "categoria": "suv", "preco": 61857.0}] if tool_name == "realizar_pedido": return { "numero_pedido": "PED-TESTE-123", "status": "Ativo", "modelo_veiculo": "Fiat Argo 2020", "valor_veiculo": 61857.0, } raise AssertionError(f"Tool inesperada no teste: {tool_name}") registry.execute = single_result_execute flow = OrderFlowHarness(state=state, registry=registry) async def fake_hydrate_mock_customer_from_cpf(cpf: str, user_id: int | None = None): return {"cpf": cpf, "user_id": user_id} with patch( "app.services.flows.order_flow.hydrate_mock_customer_from_cpf", new=fake_hydrate_mock_customer_from_cpf, ): response = await flow._try_collect_and_create_order( message="quero comprar um carro de ate 70 mil", user_id=10, extracted_fields={}, intents={"order_create": True}, ) self.assertIn("Encontrei 1 opcao", response) self.assertIn("Fiat Argo 2020", response) self.assertEqual(len(registry.calls), 1) self.assertEqual(registry.calls[0][0], "consultar_estoque") self.assertEqual(state.get_user_context(10)["selected_vehicle"], None) self.assertIsNotNone(state.get_user_context(10)["pending_single_vehicle_confirmation"]) follow_up_response = await flow._try_collect_and_create_order( message="12345678909", user_id=10, extracted_fields={}, intents={}, ) self.assertIn("Posso seguir com essa opcao", follow_up_response) self.assertEqual(len(registry.calls), 1) confirmation_response = await flow._try_collect_and_create_order( message="sim", user_id=10, extracted_fields={}, intents={}, ) self.assertEqual(len(registry.calls), 2) self.assertEqual(registry.calls[1][0], "realizar_pedido") self.assertEqual(registry.calls[1][1]["vehicle_id"], 7) self.assertIn("Pedido criado com sucesso.", confirmation_response) async def test_order_flow_creates_order_with_selected_vehicle_from_list_index(self): state = FakeState( entries={ "pending_order_drafts": { 10: { "payload": {"cpf": "12345678909"}, "expires_at": utc_now() + timedelta(minutes=30), } } }, contexts={ 10: { "generic_memory": {"cpf": "12345678909"}, "last_stock_results": [ {"id": 1, "modelo": "Honda Civic 2021", "categoria": "sedan", "preco": 51524.0}, {"id": 2, "modelo": "Toyota Corolla 2020", "categoria": "hatch", "preco": 58476.0}, ], "selected_vehicle": None, } }, ) registry = FakeRegistry() flow = OrderFlowHarness(state=state, registry=registry) async def fake_hydrate_mock_customer_from_cpf(cpf: str, user_id: int | None = None): return {"cpf": cpf, "user_id": user_id} with patch( "app.services.flows.order_flow.hydrate_mock_customer_from_cpf", new=fake_hydrate_mock_customer_from_cpf, ): response = await flow._try_collect_and_create_order( message="2", user_id=10, extracted_fields={}, intents={}, ) self.assertEqual(len(registry.calls), 1) tool_name, arguments, tool_user_id = registry.calls[0] self.assertEqual(tool_name, "realizar_pedido") self.assertEqual(tool_user_id, 10) self.assertEqual(arguments["vehicle_id"], 2) self.assertEqual(arguments["cpf"], "12345678909") self.assertIn("Veiculo: Toyota Corolla 2020", response) self.assertEqual(state.get_user_context(10).get("selected_vehicle"), None) self.assertEqual(state.get_user_context(10).get("last_stock_results"), []) self.assertEqual(state.get_user_context(10).get("pending_single_vehicle_confirmation"), None) async def test_order_flow_reuses_last_stock_results_when_user_requests_order_by_model_name(self): state = FakeState( contexts={ 10: { "generic_memory": {}, "last_stock_results": [ {"id": 9, "modelo": "Hyundai HB20S 2022", "categoria": "sedan", "preco": 76000.0}, {"id": 2, "modelo": "Toyota Corolla 2020", "categoria": "hatch", "preco": 58476.0}, ], "selected_vehicle": None, } } ) registry = FakeRegistry() flow = OrderFlowHarness(state=state, registry=registry) first_response = await flow._try_collect_and_create_order( message="Quero fazer o pedido do Hyundai HB20S 2022", user_id=10, extracted_fields={"modelo_veiculo": "Hyundai HB20S 2022"}, intents={"order_create": True}, ) self.assertIn("cpf do cliente", first_response.lower()) self.assertEqual(registry.calls, []) self.assertEqual(state.get_user_context(10)["selected_vehicle"]["id"], 9) async def fake_hydrate_mock_customer_from_cpf(cpf: str, user_id: int | None = None): return {"cpf": cpf, "user_id": user_id} with patch( "app.services.flows.order_flow.hydrate_mock_customer_from_cpf", new=fake_hydrate_mock_customer_from_cpf, ): second_response = await flow._try_collect_and_create_order( message="12345678909", user_id=10, extracted_fields={}, intents={}, ) self.assertEqual(len(registry.calls), 1) tool_name, arguments, tool_user_id = registry.calls[0] self.assertEqual(tool_name, "realizar_pedido") self.assertEqual(tool_user_id, 10) self.assertEqual(arguments["vehicle_id"], 9) self.assertEqual(arguments["cpf"], "12345678909") self.assertIn("Veiculo: Hyundai HB20S 2022", second_response) async def test_order_flow_accepts_partial_model_reference_from_last_stock_results(self): state = FakeState( contexts={ 10: { "active_domain": "sales", "generic_memory": {"orcamento_max": 70000}, "shared_memory": {"orcamento_max": 70000}, "last_stock_results": [ {"id": 15, "modelo": "Volkswagen T-Cross 2024", "categoria": "hatch", "preco": 59306.0}, {"id": 16, "modelo": "Volkswagen T-Cross 2024", "categoria": "hatch", "preco": 59306.0}, {"id": 2, "modelo": "Toyota Corolla 2020", "categoria": "hatch", "preco": 58476.0}, ], "selected_vehicle": None, } } ) registry = FakeRegistry() flow = OrderFlowHarness(state=state, registry=registry) response = await flow._try_collect_and_create_order( message="gostaria de fazer o pedido do Volkswagen T-Cros", user_id=10, extracted_fields={}, intents={"order_create": True}, ) draft = state.get_entry("pending_order_drafts", 10) self.assertIsNotNone(draft) self.assertEqual(draft["payload"]["vehicle_id"], 15) self.assertEqual(draft["payload"]["modelo_veiculo"], "Volkswagen T-Cross 2024") self.assertEqual(state.get_user_context(10)["selected_vehicle"]["id"], 15) self.assertIn("cpf do cliente", response.lower()) self.assertEqual(registry.calls, []) async def test_order_flow_bootstraps_selection_from_last_stock_results_without_repeating_order_verb(self): state = FakeState( contexts={ 10: { "active_domain": "sales", "generic_memory": {"orcamento_max": 70000, "perfil_veiculo": ["suv"]}, "shared_memory": {"orcamento_max": 70000, "perfil_veiculo": ["suv"]}, "last_stock_results": [ {"id": 7, "modelo": "Fiat Argo 2020", "categoria": "suv", "preco": 61857.0}, {"id": 3, "modelo": "Chevrolet Onix 2022", "categoria": "suv", "preco": 51809.0}, ], "selected_vehicle": None, } } ) registry = FakeRegistry() flow = OrderFlowHarness(state=state, registry=registry) first_response = await flow._try_collect_and_create_order( message="quero a opcao 1", user_id=10, extracted_fields={}, intents={}, ) self.assertIn("cpf do cliente", first_response.lower()) self.assertEqual(registry.calls, []) self.assertEqual(state.get_user_context(10)["selected_vehicle"]["id"], 7) async def fake_hydrate_mock_customer_from_cpf(cpf: str, user_id: int | None = None): return {"cpf": cpf, "user_id": user_id} with patch( "app.services.flows.order_flow.hydrate_mock_customer_from_cpf", new=fake_hydrate_mock_customer_from_cpf, ): second_response = await flow._try_collect_and_create_order( message="12345678909", user_id=10, extracted_fields={}, intents={}, ) self.assertEqual(len(registry.calls), 1) self.assertEqual(registry.calls[0][0], "realizar_pedido") self.assertEqual(registry.calls[0][1]["vehicle_id"], 7) self.assertEqual(registry.calls[0][1]["cpf"], "12345678909") self.assertIn("Veiculo: Fiat Argo 2020", second_response) async def test_order_flow_reads_vehicle_selection_from_pending_stock_selection_repository_entry(self): state = FakeState( entries={ "pending_stock_selections": { 10: { "payload": [ {"id": 7, "modelo": "Fiat Argo 2020", "categoria": "suv", "preco": 61857.0}, {"id": 3, "modelo": "Chevrolet Onix 2022", "categoria": "suv", "preco": 51809.0}, ], "expires_at": utc_now() + timedelta(minutes=30), } } }, contexts={ 10: { "active_domain": "sales", "generic_memory": {}, "shared_memory": {}, "last_stock_results": [], "selected_vehicle": None, } } ) registry = FakeRegistry() flow = OrderFlowHarness(state=state, registry=registry) response = await flow._try_collect_and_create_order( message="quero a opcao 1", user_id=10, extracted_fields={}, intents={}, ) self.assertIn("cpf do cliente", response.lower()) self.assertEqual(registry.calls, []) self.assertEqual(state.get_user_context(10)["selected_vehicle"]["id"], 7) async def test_order_flow_keeps_relaxed_budget_suggestion_selected_across_follow_up(self): state = FakeState( entries={ "pending_stock_selections": { 10: { "payload": [ {"id": 9, "modelo": "Hyundai HB20S 2022", "categoria": "suv", "preco": 76000.0, "budget_relaxed": True}, {"id": 2, "modelo": "Toyota Corolla 2020", "categoria": "suv", "preco": 58476.0, "budget_relaxed": True}, ], "expires_at": utc_now() + timedelta(minutes=30), } } }, contexts={ 10: { "active_domain": "sales", "generic_memory": {"orcamento_max": 70000, "perfil_veiculo": ["suv"]}, "shared_memory": {"orcamento_max": 70000, "perfil_veiculo": ["suv"]}, "last_stock_results": [], "selected_vehicle": None, } } ) registry = FakeRegistry() flow = OrderFlowHarness(state=state, registry=registry) first_response = await flow._try_collect_and_create_order( message="1", user_id=10, extracted_fields={}, intents={}, ) self.assertIn("cpf do cliente", first_response.lower()) self.assertEqual(state.get_user_context(10)["selected_vehicle"]["id"], 9) async def fake_hydrate_mock_customer_from_cpf(cpf: str, user_id: int | None = None): return {"cpf": cpf, "user_id": user_id} with patch( "app.services.flows.order_flow.hydrate_mock_customer_from_cpf", new=fake_hydrate_mock_customer_from_cpf, ): second_response = await flow._try_collect_and_create_order( message="12345678909", user_id=10, extracted_fields={}, intents={}, ) self.assertEqual(len(registry.calls), 1) self.assertEqual(registry.calls[0][0], "realizar_pedido") self.assertEqual(registry.calls[0][1]["vehicle_id"], 9) self.assertEqual(registry.calls[0][1]["cpf"], "12345678909") self.assertIn("Veiculo: Hyundai HB20S 2022", second_response) async def test_order_flow_restores_draft_from_context_snapshot_when_bucket_is_missing(self): state = FakeState( contexts={ 10: { "active_domain": "sales", "active_task": "order_create", "generic_memory": {}, "shared_memory": {}, "collected_slots": { "order_create": {"vehicle_id": 2, "modelo_veiculo": "Toyota Corolla 2020"}, }, "flow_snapshots": { "order_create": { "payload": {"vehicle_id": 2, "modelo_veiculo": "Toyota Corolla 2020"}, "expires_at": utc_now() + timedelta(minutes=30), } }, "last_stock_results": [ {"id": 2, "modelo": "Toyota Corolla 2020", "categoria": "hatch", "preco": 58476.0}, ], "selected_vehicle": {"id": 2, "modelo": "Toyota Corolla 2020", "categoria": "hatch", "preco": 58476.0}, } } ) registry = FakeRegistry() flow = OrderFlowHarness(state=state, registry=registry) async def fake_hydrate_mock_customer_from_cpf(cpf: str, user_id: int | None = None): return {"cpf": cpf, "user_id": user_id} with patch( "app.services.flows.order_flow.hydrate_mock_customer_from_cpf", new=fake_hydrate_mock_customer_from_cpf, ): response = await flow._try_collect_and_create_order( message="12345678909", user_id=10, extracted_fields={}, intents={}, ) self.assertEqual(len(registry.calls), 1) tool_name, arguments, tool_user_id = registry.calls[0] self.assertEqual(tool_name, "realizar_pedido") self.assertEqual(tool_user_id, 10) self.assertEqual(arguments["vehicle_id"], 2) self.assertEqual(arguments["cpf"], "12345678909") self.assertIn("Veiculo: Toyota Corolla 2020", response) async def test_order_flow_selection_uses_list_position_not_vehicle_id(self): state = FakeState( entries={ "pending_order_drafts": { 10: { "payload": {"cpf": "12345678909"}, "expires_at": utc_now() + timedelta(minutes=30), } } }, contexts={ 10: { "generic_memory": {"cpf": "12345678909"}, "last_stock_results": [ {"id": 3, "modelo": "Chevrolet Onix 2022", "categoria": "suv", "preco": 51809.0}, {"id": 7, "modelo": "Fiat Argo 2020", "categoria": "suv", "preco": 61857.0}, ], "selected_vehicle": None, } }, ) registry = FakeRegistry() flow = OrderFlowHarness(state=state, registry=registry) async def fake_hydrate_mock_customer_from_cpf(cpf: str, user_id: int | None = None): return {"cpf": cpf, "user_id": user_id} with patch( "app.services.flows.order_flow.hydrate_mock_customer_from_cpf", new=fake_hydrate_mock_customer_from_cpf, ): response = await flow._try_collect_and_create_order( message="3", user_id=10, extracted_fields={}, intents={}, ) self.assertEqual(registry.calls, []) self.assertIn("escolha primeiro qual veiculo", response.lower()) self.assertIn("1. Chevrolet Onix 2022", response) self.assertIn("2. Fiat Argo 2020", response) async def test_order_flow_keeps_draft_and_clears_retryable_field_on_tool_error(self): state = FakeState( entries={ "pending_order_drafts": { 10: { "payload": {"cpf": "12345678909", "vehicle_id": 99}, "expires_at": utc_now() + timedelta(minutes=30), } } }, contexts={ 10: { "generic_memory": {"cpf": "12345678909"}, "last_stock_results": [], "selected_vehicle": None, } }, ) registry = FakeRegistry() registry.raise_http_exception = HTTPException( status_code=409, detail={ "code": "vehicle_already_reserved", "message": "Este veiculo ja esta reservado e nao aparece mais no estoque disponivel.", "retryable": True, "field": "vehicle_id", }, ) flow = OrderFlowHarness(state=state, registry=registry) async def fake_hydrate_mock_customer_from_cpf(cpf: str, user_id: int | None = None): return {"cpf": cpf, "user_id": user_id} with patch( "app.services.flows.order_flow.hydrate_mock_customer_from_cpf", new=fake_hydrate_mock_customer_from_cpf, ): response = await flow._try_collect_and_create_order( message="quero esse carro", user_id=10, extracted_fields={}, intents={}, ) draft = state.get_entry("pending_order_drafts", 10) self.assertIn("ja esta reservado", response) self.assertIsNotNone(draft) self.assertEqual(draft["payload"].get("cpf"), "12345678909") self.assertNotIn("vehicle_id", draft["payload"]) async def test_order_flow_clears_draft_after_non_retryable_credit_rejection(self): state = FakeState( entries={ "pending_order_drafts": { 10: { "payload": {"cpf": "12345678909", "vehicle_id": 9, "modelo_veiculo": "Hyundai HB20S 2022"}, "expires_at": utc_now() + timedelta(minutes=30), } } }, contexts={ 10: { "active_domain": "sales", "generic_memory": {"cpf": "12345678909"}, "shared_memory": {"cpf": "12345678909"}, "last_stock_results": [ {"id": 9, "modelo": "Hyundai HB20S 2022", "categoria": "sedan", "preco": 76000.0}, ], "selected_vehicle": {"id": 9, "modelo": "Hyundai HB20S 2022", "categoria": "sedan", "preco": 76000.0}, "pending_single_vehicle_confirmation": {"id": 9, "modelo": "Hyundai HB20S 2022", "categoria": "sedan", "preco": 76000.0}, } }, ) registry = FakeRegistry() registry.raise_http_exception = HTTPException( status_code=400, detail={ "code": "credit_not_approved", "message": "Cliente nao aprovado para este valor. Limite disponivel: R$ 70878.00.", "retryable": False, "field": "cpf", }, ) flow = OrderFlowHarness(state=state, registry=registry) async def fake_hydrate_mock_customer_from_cpf(cpf: str, user_id: int | None = None): return {"cpf": cpf, "user_id": user_id} with patch( "app.services.flows.order_flow.hydrate_mock_customer_from_cpf", new=fake_hydrate_mock_customer_from_cpf, ): response = await flow._try_collect_and_create_order( message="12345678909", user_id=10, extracted_fields={}, intents={}, ) self.assertIn("nao aprovado", response) self.assertIsNone(state.get_entry("pending_order_drafts", 10)) self.assertEqual(state.get_user_context(10)["selected_vehicle"], None) self.assertEqual(state.get_user_context(10)["last_stock_results"], []) self.assertEqual(state.get_user_context(10).get("pending_single_vehicle_confirmation"), None) async def test_order_flow_refreshes_stale_stock_results_when_budget_changes(self): state = FakeState( contexts={ 10: { "generic_memory": {"cpf": "12345678909", "orcamento_max": 45000}, "last_stock_results": [ {"id": 3, "modelo": "Chevrolet Onix 2022", "categoria": "suv", "preco": 51809.0}, {"id": 7, "modelo": "Fiat Argo 2020", "categoria": "suv", "preco": 61857.0}, ], "selected_vehicle": {"id": 3, "modelo": "Chevrolet Onix 2022", "categoria": "suv", "preco": 51809.0}, } } ) registry = FakeRegistry() flow = OrderFlowHarness(state=state, registry=registry) async def fake_hydrate_mock_customer_from_cpf(cpf: str, user_id: int | None = None): return {"cpf": cpf, "user_id": user_id} with patch( "app.services.flows.order_flow.hydrate_mock_customer_from_cpf", new=fake_hydrate_mock_customer_from_cpf, ): response = await flow._try_collect_and_create_order( message="Quero comprar um carro de 45 mil, meu CPF e 12345678909", user_id=10, extracted_fields={"cpf": "12345678909"}, intents={}, turn_decision={"intent": "order_create", "domain": "sales", "action": "collect_order_create"}, ) self.assertEqual(registry.calls[0][0], "consultar_estoque") self.assertNotIn("Chevrolet Onix 2022", response) self.assertEqual(state.get_user_context(10)["selected_vehicle"], None) self.assertEqual(len(state.get_user_context(10)["last_stock_results"]), 2) async def test_order_flow_refreshes_stale_stock_results_when_profile_changes(self): state = FakeState( contexts={ 10: { "generic_memory": {"cpf": "12345678909", "orcamento_max": 50000, "perfil_veiculo": ["hatch"]}, "last_stock_results": [ {"id": 3, "modelo": "Chevrolet Onix 2022", "categoria": "suv", "preco": 48000.0}, ], "selected_vehicle": {"id": 3, "modelo": "Chevrolet Onix 2022", "categoria": "suv", "preco": 48000.0}, } } ) registry = FakeRegistry() flow = OrderFlowHarness(state=state, registry=registry) async def fake_hydrate_mock_customer_from_cpf(cpf: str, user_id: int | None = None): return {"cpf": cpf, "user_id": user_id} with patch( "app.services.flows.order_flow.hydrate_mock_customer_from_cpf", new=fake_hydrate_mock_customer_from_cpf, ): response = await flow._try_collect_and_create_order( message="Quero comprar um hatch de 50 mil, meu CPF e 12345678909", user_id=10, extracted_fields={"cpf": "12345678909"}, intents={}, turn_decision={"intent": "order_create", "domain": "sales", "action": "collect_order_create"}, ) self.assertEqual(registry.calls[0][0], "consultar_estoque") self.assertEqual(state.get_user_context(10)["selected_vehicle"], None) self.assertTrue( all(item.get("categoria") != "suv" for item in state.get_user_context(10)["last_stock_results"]) ) class RentalFlowDraftTests(unittest.IsolatedAsyncioTestCase): def _base_context(self): return { "active_domain": "general", "active_task": None, "generic_memory": {}, "shared_memory": {}, "order_queue": [], "pending_order_selection": None, "pending_switch": None, "last_stock_results": [], "selected_vehicle": None, "last_rental_results": [], "selected_rental_vehicle": None, } async def test_rental_flow_lists_fleet_and_stores_pending_selection(self): state = FakeState(contexts={21: self._base_context()}) registry = FakeRegistry() flow = RentalFlowHarness(state=state, registry=registry) response = await flow._try_collect_and_open_rental( message="quais carros estao disponiveis para aluguel", user_id=21, extracted_fields={}, intents={}, turn_decision={"intent": "rental_list", "domain": "rental", "action": "answer_user"}, ) self.assertEqual(registry.calls[0][0], "consultar_frota_aluguel") self.assertEqual(registry.calls[0][1]["ordenar_diaria"], "random") self.assertIn("veiculo(s) para locacao", response) self.assertIsNotNone(state.get_entry("pending_rental_selections", 21)) self.assertEqual(state.get_user_context(21)["active_domain"], "rental") async def test_rental_flow_filters_fleet_by_category_when_user_requests_suv(self): state = FakeState(contexts={21: self._base_context()}) registry = FakeRegistry() flow = RentalFlowHarness(state=state, registry=registry) response = await flow._try_collect_and_open_rental( message="quais suv estao disponiveis para aluguel", user_id=21, extracted_fields={}, intents={}, turn_decision={"intent": "rental_list", "domain": "rental", "action": "answer_user"}, ) self.assertEqual(registry.calls[0][0], "consultar_frota_aluguel") self.assertEqual(registry.calls[0][1]["categoria"], "suv") self.assertEqual(registry.calls[0][1]["ordenar_diaria"], "asc") self.assertIn("veiculo(s) para locacao", response) async def test_rental_flow_filters_fleet_by_model_when_user_requests_specific_vehicle(self): state = FakeState(contexts={21: self._base_context()}) registry = FakeRegistry() flow = RentalFlowHarness(state=state, registry=registry) response = await flow._try_collect_and_open_rental( message="quero alugar um chevrolet tracker", user_id=21, extracted_fields={}, intents={}, turn_decision={"intent": "rental_create", "domain": "rental", "action": "answer_user"}, ) self.assertEqual(registry.calls[0][0], "consultar_frota_aluguel") self.assertEqual(registry.calls[0][1]["modelo"], "Chevrolet Tracker") self.assertEqual(registry.calls[0][1]["ordenar_diaria"], "asc") self.assertIn("veiculo(s) para locacao", response) async def test_rental_flow_ignores_vehicle_year_when_filtering_specific_model(self): state = FakeState(contexts={21: self._base_context()}) registry = FakeRegistry() flow = RentalFlowHarness(state=state, registry=registry) response = await flow._try_collect_and_open_rental( message="quero alugar um fiat pulse 2024", user_id=21, extracted_fields={}, intents={}, turn_decision={"intent": "rental_create", "domain": "rental", "action": "answer_user"}, ) self.assertEqual(registry.calls[0][0], "consultar_frota_aluguel") self.assertEqual(registry.calls[0][1]["modelo"], "Fiat Pulse") self.assertEqual(registry.calls[0][1]["ordenar_diaria"], "asc") self.assertIn("veiculo(s) para locacao", response) async def test_rental_flow_keeps_generic_listing_when_request_is_not_a_specific_model(self): state = FakeState(contexts={21: self._base_context()}) registry = FakeRegistry() flow = RentalFlowHarness(state=state, registry=registry) response = await flow._try_collect_and_open_rental( message="quero alugar um carro para viajar com a familia", user_id=21, extracted_fields={}, intents={}, turn_decision={"intent": "rental_create", "domain": "rental", "action": "answer_user"}, ) self.assertEqual(registry.calls[0][0], "consultar_frota_aluguel") self.assertNotIn("modelo", registry.calls[0][1]) self.assertEqual(registry.calls[0][1]["ordenar_diaria"], "random") self.assertIn("veiculo(s) para locacao", response) async def test_rental_flow_accepts_vehicle_selection_from_list_index(self): state = FakeState( entries={ "pending_rental_selections": { 21: { "payload": [ {"id": 1, "placa": "RAA1A01", "modelo": "Chevrolet Tracker", "categoria": "suv", "ano": 2024, "valor_diaria": 219.9, "status": "disponivel"}, {"id": 2, "placa": "RAA1A02", "modelo": "Fiat Pulse", "categoria": "suv", "ano": 2024, "valor_diaria": 189.9, "status": "disponivel"}, ], "expires_at": utc_now() + timedelta(minutes=15), } } }, contexts={21: self._base_context() | {"active_domain": "rental"}}, ) registry = FakeRegistry() flow = RentalFlowHarness(state=state, registry=registry) response = await flow._try_collect_and_open_rental( message="1", user_id=21, extracted_fields={}, intents={}, turn_decision={"intent": "rental_create", "domain": "rental", "action": "answer_user"}, ) draft = state.get_entry("pending_rental_drafts", 21) self.assertIsNotNone(draft) self.assertEqual(draft["payload"]["rental_vehicle_id"], 1) self.assertEqual(state.get_user_context(21)["selected_rental_vehicle"]["placa"], "RAA1A01") self.assertIn("a data e hora de inicio da locacao", response) self.assertIn("a data e hora previstas para devolucao", response) async def test_rental_flow_opens_contract_after_collecting_dates(self): state = FakeState( entries={ "pending_rental_drafts": { 21: { "payload": { "rental_vehicle_id": 1, "placa": "RAA1A01", "modelo_veiculo": "Chevrolet Tracker", }, "expires_at": utc_now() + timedelta(minutes=15), } } }, contexts={21: self._base_context() | {"active_domain": "rental", "selected_rental_vehicle": {"id": 1, "placa": "RAA1A01", "modelo": "Chevrolet Tracker", "categoria": "suv", "ano": 2024, "valor_diaria": 219.9, "status": "disponivel"}}}, ) registry = FakeRegistry() flow = RentalFlowHarness(state=state, registry=registry) response = await flow._try_collect_and_open_rental( message="20/03/2026 10:00 ate 23/03/2026 10:00", user_id=21, extracted_fields={}, intents={}, turn_decision={"intent": "rental_create", "domain": "rental", "action": "answer_user"}, ) self.assertEqual(registry.calls[0][0], "abrir_locacao_aluguel") self.assertEqual(registry.calls[0][1]["rental_vehicle_id"], 1) self.assertEqual(registry.calls[0][1]["data_inicio"], "20/03/2026 10:00") self.assertEqual(registry.calls[0][1]["data_fim_prevista"], "23/03/2026 10:00") self.assertIsNone(state.get_entry("pending_rental_drafts", 21)) self.assertIn("LOC-TESTE-123", response) async def test_rental_flow_preserves_relative_dates_from_initial_request_until_vehicle_selection(self): fixed_now = lambda: datetime(2026, 3, 19, 9, 0) state = FakeState(contexts={21: self._base_context()}) registry = FakeRegistry() flow = RentalFlowHarness(state=state, registry=registry, rental_now_provider=fixed_now) list_response = await flow._try_collect_and_open_rental( message="Quero alugar um hatch amanha 10h ate depois de amanha 10h", user_id=21, extracted_fields={}, intents={}, turn_decision={"intent": "rental_create", "domain": "rental", "action": "answer_user"}, ) self.assertEqual(registry.calls[0][0], "consultar_frota_aluguel") self.assertNotIn("modelo", registry.calls[0][1]) draft = state.get_entry("pending_rental_drafts", 21) self.assertIsNotNone(draft) self.assertEqual(draft["payload"]["data_inicio"], "20/03/2026 10:00") self.assertEqual(draft["payload"]["data_fim_prevista"], "21/03/2026 10:00") self.assertEqual(draft["payload"]["categoria"], "hatch") self.assertIn("veiculo(s) para locacao", list_response) open_response = await flow._try_collect_and_open_rental( message="1", user_id=21, extracted_fields={}, intents={}, turn_decision={"intent": "rental_create", "domain": "rental", "action": "answer_user"}, ) self.assertEqual(registry.calls[1][0], "abrir_locacao_aluguel") self.assertEqual(registry.calls[1][1]["data_inicio"], "20/03/2026 10:00") self.assertEqual(registry.calls[1][1]["data_fim_prevista"], "21/03/2026 10:00") self.assertIn("LOC-TESTE-123", open_response) async def test_rental_flow_preserves_relative_dates_even_when_day_words_arrive_truncated(self): fixed_now = lambda: datetime(2026, 3, 19, 9, 0) state = FakeState(contexts={21: self._base_context()}) registry = FakeRegistry() flow = RentalFlowHarness(state=state, registry=registry, rental_now_provider=fixed_now) list_response = await flow._try_collect_and_open_rental( message="Quero alugar um hatch amanh 10h at depois de amanh 10h", user_id=21, extracted_fields={}, intents={}, turn_decision={"intent": "rental_create", "domain": "rental", "action": "answer_user"}, ) self.assertEqual(registry.calls[0][0], "consultar_frota_aluguel") draft = state.get_entry("pending_rental_drafts", 21) self.assertIsNotNone(draft) self.assertEqual(draft["payload"]["data_inicio"], "20/03/2026 10:00") self.assertEqual(draft["payload"]["data_fim_prevista"], "21/03/2026 10:00") self.assertNotIn("modelo", registry.calls[0][1]) self.assertIn("veiculo(s) para locacao", list_response) open_response = await flow._try_collect_and_open_rental( message="1", user_id=21, extracted_fields={}, intents={}, turn_decision={"intent": "rental_create", "domain": "rental", "action": "answer_user"}, ) self.assertEqual(registry.calls[1][0], "abrir_locacao_aluguel") self.assertEqual(registry.calls[1][1]["data_inicio"], "20/03/2026 10:00") self.assertEqual(registry.calls[1][1]["data_fim_prevista"], "21/03/2026 10:00") self.assertIn("LOC-TESTE-123", open_response) async def test_rental_flow_preserves_relative_dates_even_when_day_words_arrive_with_question_marks(self): fixed_now = lambda: datetime(2026, 3, 19, 9, 0) state = FakeState(contexts={21: self._base_context()}) registry = FakeRegistry() flow = RentalFlowHarness(state=state, registry=registry, rental_now_provider=fixed_now) list_response = await flow._try_collect_and_open_rental( message="Quero alugar um hatch amanh? 10h at? depois de amanh? 10h", user_id=21, extracted_fields={}, intents={}, turn_decision={"intent": "rental_create", "domain": "rental", "action": "answer_user"}, ) self.assertEqual(registry.calls[0][0], "consultar_frota_aluguel") draft = state.get_entry("pending_rental_drafts", 21) self.assertIsNotNone(draft) self.assertEqual(draft["payload"]["data_inicio"], "20/03/2026 10:00") self.assertEqual(draft["payload"]["data_fim_prevista"], "21/03/2026 10:00") self.assertNotIn("modelo", registry.calls[0][1]) self.assertIn("veiculo(s) para locacao", list_response) open_response = await flow._try_collect_and_open_rental( message="1", user_id=21, extracted_fields={}, intents={}, turn_decision={"intent": "rental_create", "domain": "rental", "action": "answer_user"}, ) self.assertEqual(registry.calls[1][0], "abrir_locacao_aluguel") self.assertEqual(registry.calls[1][1]["data_inicio"], "20/03/2026 10:00") self.assertEqual(registry.calls[1][1]["data_fim_prevista"], "21/03/2026 10:00") self.assertIn("LOC-TESTE-123", open_response) async def test_rental_flow_rehydrates_search_payload_from_context_when_selection_survives_without_draft(self): state = FakeState( entries={ "pending_rental_selections": { 21: { "payload": [ {"id": 1, "placa": "RAA1A01", "modelo": "Chevrolet Tracker", "categoria": "hatch", "ano": 2024, "valor_diaria": 219.9, "status": "disponivel"}, {"id": 2, "placa": "RAA1A02", "modelo": "Fiat Pulse", "categoria": "hatch", "ano": 2024, "valor_diaria": 189.9, "status": "disponivel"}, ], "expires_at": utc_now() + timedelta(minutes=15), } } }, contexts={ 21: self._base_context() | { "active_domain": "rental", } }, ) state.get_entry("pending_rental_selections", 21)["search_payload"] = { "categoria": "hatch", "data_inicio": "20/03/2026 10:00", "data_fim_prevista": "21/03/2026 10:00", } registry = FakeRegistry() flow = RentalFlowHarness(state=state, registry=registry) response = await flow._try_collect_and_open_rental( message="2", user_id=21, extracted_fields={}, intents={}, turn_decision={"intent": "rental_create", "domain": "rental", "action": "answer_user"}, ) self.assertEqual(registry.calls[0][0], "abrir_locacao_aluguel") self.assertEqual(registry.calls[0][1]["rental_vehicle_id"], 2) self.assertEqual(registry.calls[0][1]["data_inicio"], "20/03/2026 10:00") self.assertEqual(registry.calls[0][1]["data_fim_prevista"], "21/03/2026 10:00") self.assertIn("LOC-TESTE-123", response) async def test_rental_flow_opens_contract_after_collecting_relative_dates_follow_up(self): fixed_now = lambda: datetime(2026, 3, 19, 9, 0) state = FakeState( entries={ "pending_rental_drafts": { 21: { "payload": { "rental_vehicle_id": 1, "placa": "RAA1A01", "modelo_veiculo": "Chevrolet Tracker", }, "expires_at": utc_now() + timedelta(minutes=15), } } }, contexts={21: self._base_context() | {"active_domain": "rental", "selected_rental_vehicle": {"id": 1, "placa": "RAA1A01", "modelo": "Chevrolet Tracker", "categoria": "suv", "ano": 2024, "valor_diaria": 219.9, "status": "disponivel"}}}, ) registry = FakeRegistry() flow = RentalFlowHarness(state=state, registry=registry, rental_now_provider=fixed_now) response = await flow._try_collect_and_open_rental( message="amanha 10h ate depois de amanha 10h", user_id=21, extracted_fields={}, intents={}, turn_decision={"intent": "rental_create", "domain": "rental", "action": "answer_user"}, ) self.assertEqual(registry.calls[0][0], "abrir_locacao_aluguel") self.assertEqual(registry.calls[0][1]["data_inicio"], "20/03/2026 10:00") self.assertEqual(registry.calls[0][1]["data_fim_prevista"], "21/03/2026 10:00") self.assertIsNone(state.get_entry("pending_rental_drafts", 21)) self.assertIn("LOC-TESTE-123", response) class ReviewFlowDraftTests(unittest.IsolatedAsyncioTestCase): async def test_review_flow_extracts_relative_datetime_from_followup_message(self): fixed_now = lambda: datetime(2026, 3, 12, 9, 0) state = FakeState( entries={ "pending_review_drafts": { 21: { "payload": {"placa": "ABC1269"}, "expires_at": utc_now() + timedelta(minutes=30), } } } ) registry = FakeRegistry() flow = ReviewFlowHarness(state=state, registry=registry, review_now_provider=fixed_now) response = await flow._try_collect_and_schedule_review( message="Eu gostaria de marcar amanha as 16 horas", user_id=21, extracted_fields={}, intents={}, turn_decision={"intent": "review_schedule", "domain": "review", "action": "answer_user"}, ) draft = state.get_entry("pending_review_drafts", 21) self.assertIsNotNone(draft) self.assertIn("data_hora", draft["payload"]) self.assertEqual(draft["payload"]["data_hora"][-5:], "16:00") self.assertIn("o modelo do veiculo", response) self.assertTrue(any(payload.get("review_flow_source") == "draft" for _, payload in flow.logged_events)) async def test_review_flow_date_only_with_other_missing_fields_mentions_captured_date_and_requested_time(self): fixed_now = lambda: datetime(2026, 3, 12, 9, 0) state = FakeState( entries={ "pending_review_drafts": { 21: { "payload": {"placa": "ABC1269"}, "expires_at": utc_now() + timedelta(minutes=30), } } } ) registry = FakeRegistry() flow = ReviewFlowHarness(state=state, registry=registry, review_now_provider=fixed_now) response = await flow._try_collect_and_schedule_review( message="amanha", user_id=21, extracted_fields={}, intents={}, turn_decision={"intent": "review_schedule", "domain": "review", "action": "answer_user"}, ) draft = state.get_entry("pending_review_drafts", 21) self.assertIsNotNone(draft) self.assertEqual(draft["payload"].get("data_hora_base"), "13/03/2026") self.assertIn("Perfeito. Tenho a data 13/03/2026.", response) self.assertIn("- o horario desejado para a revisao", response) self.assertIn("- o modelo do veiculo", response) self.assertNotIn("- a data e hora desejada para a revisao", response) async def test_review_flow_date_only_supports_day_after_tomorrow(self): fixed_now = lambda: datetime(2026, 3, 12, 9, 0) state = FakeState( entries={ "pending_review_drafts": { 21: { "payload": {"placa": "ABC1269"}, "expires_at": utc_now() + timedelta(minutes=30), } } } ) registry = FakeRegistry() flow = ReviewFlowHarness(state=state, registry=registry, review_now_provider=fixed_now) response = await flow._try_collect_and_schedule_review( message="depois de amanha", user_id=21, extracted_fields={}, intents={}, turn_decision={"intent": "review_schedule", "domain": "review", "action": "answer_user"}, ) draft = state.get_entry("pending_review_drafts", 21) self.assertIsNotNone(draft) self.assertEqual(draft["payload"].get("data_hora_base"), "14/03/2026") self.assertIn("Perfeito. Tenho a data 14/03/2026.", response) self.assertIn("- o horario desejado para a revisao", response) async def test_review_flow_keeps_review_draft_when_time_follow_up_is_misclassified_as_sales(self): state = FakeState( entries={ "pending_review_drafts": { 21: { "payload": { "placa": "ABC1269", "modelo": "Onix", "ano": 2024, "km": 12000, "revisao_previa_concessionaria": False, "data_hora_base": "13/03/2026", }, "expires_at": utc_now() + timedelta(minutes=30), } } } ) registry = FakeRegistry() flow = ReviewFlowHarness(state=state, registry=registry) response = await flow._try_collect_and_schedule_review( message="16h", user_id=21, extracted_fields={}, intents={}, turn_decision={"intent": "order_create", "domain": "sales", "action": "answer_user"}, ) self.assertEqual(registry.calls[0][0], "agendar_revisao") self.assertEqual(registry.calls[0][1]["data_hora"], "13/03/2026 16:00") self.assertIsNone(state.get_entry("pending_review_drafts", 21)) self.assertIn("REV-TESTE-123", response) async def test_review_flow_extracts_model_year_km_and_review_history_from_free_text(self): state = FakeState( entries={ "pending_review_drafts": { 21: { "payload": {"placa": "ABC1269", "data_hora": "13/03/2026 16:00"}, "expires_at": utc_now() + timedelta(minutes=30), } } } ) registry = FakeRegistry() flow = ReviewFlowHarness(state=state, registry=registry) response = await flow._try_collect_and_schedule_review( message="O modelo do meu carro e um Onix e ele e 2021, 30000 km, nunca fiz revisao", user_id=21, extracted_fields={}, intents={}, turn_decision={"intent": "review_schedule", "domain": "review", "action": "answer_user"}, ) self.assertIsNone(state.get_entry("pending_review_drafts", 21)) self.assertEqual(registry.calls[0][0], "agendar_revisao") _, arguments, tool_user_id = registry.calls[0] self.assertEqual(tool_user_id, 21) self.assertEqual(arguments.get("modelo"), "Um Onix") self.assertEqual(arguments.get("ano"), 2021) self.assertEqual(arguments.get("km"), 30000) self.assertFalse(arguments.get("revisao_previa_concessionaria")) self.assertIn("REV-TESTE-123", response) async def test_review_flow_extracts_short_vehicle_summary_from_free_text(self): state = FakeState( entries={ "pending_review_drafts": { 21: { "payload": {"placa": "ABC1269", "data_hora": "13/03/2026 16:00"}, "expires_at": utc_now() + timedelta(minutes=30), } } } ) registry = FakeRegistry() flow = ReviewFlowHarness(state=state, registry=registry) response = await flow._try_collect_and_schedule_review( message="Onix 2024, 12000 km, nao fiz revisao na concessionaria", user_id=21, extracted_fields={}, intents={}, turn_decision={"intent": "review_schedule", "domain": "review", "action": "answer_user"}, ) self.assertIsNone(state.get_entry("pending_review_drafts", 21)) self.assertEqual(registry.calls[0][0], "agendar_revisao") _, arguments, tool_user_id = registry.calls[0] self.assertEqual(tool_user_id, 21) self.assertEqual(arguments.get("modelo"), "Onix") self.assertEqual(arguments.get("ano"), 2024) self.assertEqual(arguments.get("km"), 12000) self.assertFalse(arguments.get("revisao_previa_concessionaria")) self.assertIn("REV-TESTE-123", response) async def test_review_flow_accepts_bare_model_when_it_is_last_missing_field(self): state = FakeState( entries={ "pending_review_drafts": { 21: { "payload": { "placa": "ABC1269", "data_hora": "13/03/2026 16:00", "ano": 2024, "km": 12000, "revisao_previa_concessionaria": False, }, "expires_at": utc_now() + timedelta(minutes=30), } } } ) registry = FakeRegistry() flow = ReviewFlowHarness(state=state, registry=registry) response = await flow._try_collect_and_schedule_review( message="Onix", user_id=21, extracted_fields={}, intents={}, turn_decision={"intent": "review_schedule", "domain": "review", "action": "answer_user"}, ) self.assertIsNone(state.get_entry("pending_review_drafts", 21)) self.assertEqual(registry.calls[0][0], "agendar_revisao") _, arguments, tool_user_id = registry.calls[0] self.assertEqual(tool_user_id, 21) self.assertEqual(arguments.get("modelo"), "Onix") self.assertIn("REV-TESTE-123", response) async def test_review_flow_keeps_plate_and_datetime_across_incremental_messages(self): fixed_now = lambda: datetime(2026, 3, 12, 9, 0) state = FakeState() registry = FakeRegistry() flow = ReviewFlowHarness(state=state, registry=registry, review_now_provider=fixed_now) await flow._try_collect_and_schedule_review( message="gostaria de marcar uma nova revisao agora", user_id=21, extracted_fields={}, intents={}, turn_decision={"intent": "review_schedule", "domain": "review", "action": "ask_missing_fields"}, ) await flow._try_collect_and_schedule_review( message="placa ABC1269", user_id=21, extracted_fields={}, intents={}, turn_decision={"intent": "review_schedule", "domain": "review", "action": "answer_user"}, ) await flow._try_collect_and_schedule_review( message="Eu gostaria de marcar amanha as 16 horas", user_id=21, extracted_fields={}, intents={}, turn_decision={"intent": "review_schedule", "domain": "review", "action": "answer_user"}, ) await flow._try_collect_and_schedule_review( message="O modelo do meu carro e um Onix e ele e 2021", user_id=21, extracted_fields={}, intents={}, turn_decision={"intent": "review_schedule", "domain": "review", "action": "answer_user"}, ) response = await flow._try_collect_and_schedule_review( message="30000 km, nunca fiz revisao", user_id=21, extracted_fields={}, intents={}, turn_decision={"intent": "review_schedule", "domain": "review", "action": "answer_user"}, ) self.assertIsNone(state.get_entry("pending_review_drafts", 21)) self.assertEqual(registry.calls[0][0], "agendar_revisao") _, arguments, tool_user_id = registry.calls[0] self.assertEqual(tool_user_id, 21) self.assertEqual(arguments.get("placa"), "ABC1269") self.assertEqual(arguments.get("data_hora"), "13/03/2026 16:00") self.assertEqual(arguments.get("modelo"), "Um Onix") self.assertEqual(arguments.get("ano"), 2021) self.assertEqual(arguments.get("km"), 30000) self.assertFalse(arguments.get("revisao_previa_concessionaria")) self.assertIn("REV-TESTE-123", response) async def test_review_flow_bootstraps_from_active_review_context_when_draft_is_missing(self): state = FakeState( contexts={ 21: { "active_domain": "review", "generic_memory": {}, "shared_memory": {}, "order_queue": [], "pending_order_selection": None, "pending_switch": None, "last_stock_results": [], "selected_vehicle": None, } } ) registry = FakeRegistry() flow = ReviewFlowHarness(state=state, registry=registry) response = await flow._try_collect_and_schedule_review( message="placa ABC1269", user_id=21, extracted_fields={}, intents={}, turn_decision={"intent": "general", "domain": "general", "action": "answer_user"}, ) draft = state.get_entry("pending_review_drafts", 21) self.assertIsNotNone(draft) self.assertEqual(draft["payload"]["placa"], "ABC1269") self.assertIn("a data e hora desejada para a revisao", response) self.assertTrue( any(payload.get("review_flow_source") == "active_domain_fallback" for _, payload in flow.logged_events) ) async def test_review_flow_restores_draft_from_context_snapshot_when_bucket_is_missing(self): state = FakeState( contexts={ 21: { "active_domain": "review", "active_task": "review_schedule", "generic_memory": {}, "shared_memory": {}, "collected_slots": { "review_schedule": {"placa": "A0T1C23", "data_hora": "14/03/2026 18:00"}, }, "flow_snapshots": { "review_schedule": { "payload": {"placa": "A0T1C23", "data_hora": "14/03/2026 18:00"}, "expires_at": utc_now() + timedelta(minutes=30), } }, "order_queue": [], "pending_order_selection": None, "pending_switch": None, "last_stock_results": [], "selected_vehicle": None, } } ) registry = FakeRegistry() flow = ReviewFlowHarness(state=state, registry=registry) await flow._try_collect_and_schedule_review( message="o modelo e Onix e e 2024", user_id=21, extracted_fields={"modelo": "Onix", "ano": 2024}, intents={}, turn_decision={"intent": "review_schedule", "domain": "review", "action": "answer_user"}, ) response = await flow._try_collect_and_schedule_review( message="20000 km, nunca fiz revisao na concessionaria", user_id=21, extracted_fields={}, intents={}, turn_decision={"intent": "review_schedule", "domain": "review", "action": "answer_user"}, ) self.assertEqual(registry.calls[0][0], "agendar_revisao") _, arguments, tool_user_id = registry.calls[0] self.assertEqual(tool_user_id, 21) self.assertEqual(arguments.get("placa"), "A0T1C23") self.assertEqual(arguments.get("data_hora"), "14/03/2026 18:00") self.assertEqual(arguments.get("modelo"), "Onix") self.assertEqual(arguments.get("ano"), 2024) self.assertEqual(arguments.get("km"), 20000) self.assertFalse(arguments.get("revisao_previa_concessionaria")) self.assertIn("REV-TESTE-123", response) async def test_review_flow_offers_reuse_of_last_vehicle_package(self): state = FakeState( entries={ "last_review_packages": { 21: { "payload": { "placa": "ABC1234", "modelo": "Corolla", "ano": 2020, "km": 30000, "revisao_previa_concessionaria": True, }, "expires_at": utc_now() + timedelta(minutes=30), } } } ) registry = FakeRegistry() flow = ReviewFlowHarness(state=state, registry=registry) response = await flow._try_collect_and_schedule_review( message="gostaria de agendar uma nova revisao agora", user_id=21, extracted_fields={}, intents={}, turn_decision={"intent": "review_schedule", "domain": "review", "action": "collect_review_schedule"}, ) self.assertIn("Posso reutilizar os dados do ultimo veiculo", response) self.assertIn("Corolla", response) self.assertIn("ABC1234", response) self.assertIsNotNone(state.get_entry("pending_review_reuse_confirmations", 21)) self.assertTrue( any(payload.get("review_flow_source") == "last_review_package" for _, payload in flow.logged_events) ) async def test_review_flow_rejects_reuse_and_accepts_new_vehicle_in_same_message(self): state = FakeState( entries={ "pending_review_reuse_confirmations": { 21: { "payload": { "placa": "ABC1234", "modelo": "Corolla", "ano": 2020, "km": 30000, "revisao_previa_concessionaria": True, }, "expires_at": utc_now() + timedelta(minutes=30), } } } ) registry = FakeRegistry() flow = ReviewFlowHarness(state=state, registry=registry) response = await flow._try_collect_and_schedule_review( message="nao, agora e outro veiculo, placa ABC1269", user_id=21, extracted_fields={"placa": "ABC1269"}, intents={}, turn_decision={"intent": "review_schedule", "domain": "review", "action": "collect_review_schedule"}, ) draft = state.get_entry("pending_review_drafts", 21) self.assertIsNone(state.get_entry("pending_review_reuse_confirmations", 21)) self.assertIsNotNone(draft) self.assertEqual(draft["payload"].get("placa"), "ABC1269") self.assertIn("a data e hora desejada para a revisao", response) async def test_review_flow_rejects_reuse_without_new_vehicle_and_opens_fresh_draft(self): state = FakeState( entries={ "pending_review_reuse_confirmations": { 21: { "payload": { "placa": "ABC1234", "modelo": "Corolla", "ano": 2020, "km": 30000, "revisao_previa_concessionaria": True, }, "expires_at": utc_now() + timedelta(minutes=30), } } } ) registry = FakeRegistry() flow = ReviewFlowHarness(state=state, registry=registry) response = await flow._try_collect_and_schedule_review( message="nao", user_id=21, extracted_fields={}, intents={}, turn_decision={"intent": "review_schedule", "domain": "review", "action": "collect_review_schedule"}, ) draft = state.get_entry("pending_review_drafts", 21) self.assertIsNone(state.get_entry("pending_review_reuse_confirmations", 21)) self.assertIsNotNone(draft) self.assertEqual(draft["payload"], {}) self.assertIn("a placa do veiculo", response) async def test_review_flow_offers_reuse_again_on_next_new_schedule_after_previous_rejection(self): state = FakeState( entries={ "last_review_packages": { 21: { "payload": { "placa": "A0T1C23", "modelo": "Onix", "ano": 2024, "km": 20000, "revisao_previa_concessionaria": False, }, "expires_at": utc_now() + timedelta(minutes=30), } } } ) registry = FakeRegistry() flow = ReviewFlowHarness(state=state, registry=registry) first_response = await flow._try_collect_and_schedule_review( message="quero agendar uma revisao", user_id=21, extracted_fields={}, intents={}, turn_decision={"intent": "review_schedule", "domain": "review", "action": "collect_review_schedule"}, ) self.assertIn("Posso reutilizar os dados do ultimo veiculo", first_response) second_response = await flow._try_collect_and_schedule_review( message="nao", user_id=21, extracted_fields={}, intents={}, turn_decision={"intent": "review_schedule", "domain": "review", "action": "collect_review_schedule"}, ) self.assertIn("a placa do veiculo", second_response) self.assertIsNotNone(state.get_entry("pending_review_drafts", 21)) state.pop_entry("pending_review_drafts", 21) third_response = await flow._try_collect_and_schedule_review( message="quero agendar uma revisao", user_id=21, extracted_fields={}, intents={}, turn_decision={"intent": "review_schedule", "domain": "review", "action": "collect_review_schedule"}, ) self.assertIn("Posso reutilizar os dados do ultimo veiculo", third_response) self.assertIsNotNone(state.get_entry("pending_review_reuse_confirmations", 21)) async def test_review_flow_offers_reuse_on_active_domain_fallback_without_explicit_llm_intent(self): state = FakeState( entries={ "last_review_packages": { 21: { "payload": { "placa": "ABC1C23", "modelo": "Onix", "ano": 2024, "km": 20000, "revisao_previa_concessionaria": False, }, "expires_at": utc_now() + timedelta(minutes=30), } } }, contexts={ 21: { "active_domain": "review", "generic_memory": {"placa": "ABC1C23"}, "shared_memory": {"placa": "ABC1C23"}, "last_stock_results": [], "selected_vehicle": None, } }, ) registry = FakeRegistry() flow = ReviewFlowHarness(state=state, registry=registry) response = await flow._try_collect_and_schedule_review( message="quero agendar uma revisao", user_id=21, extracted_fields={}, intents={}, turn_decision={"intent": "general", "domain": "general", "action": "answer_user"}, ) self.assertIn("Posso reutilizar os dados do ultimo veiculo", response) self.assertIsNotNone(state.get_entry("pending_review_reuse_confirmations", 21)) async def test_review_flow_explicit_reuse_request_opens_reuse_confirmation_even_without_pending_prompt(self): state = FakeState( entries={ "last_review_packages": { 21: { "payload": { "placa": "A0T1C23", "modelo": "Onix", "ano": 2024, "km": 20000, "revisao_previa_concessionaria": False, }, "expires_at": utc_now() + timedelta(minutes=30), } } } ) registry = FakeRegistry() flow = ReviewFlowHarness(state=state, registry=registry) response = await flow._try_collect_and_schedule_review( message="eu gostaria de reaproveitar as informacoes do ultimo carro", user_id=21, extracted_fields={}, intents={}, turn_decision={"intent": "general", "domain": "general", "action": "answer_user"}, ) self.assertIn("Posso reutilizar os dados do ultimo veiculo", response) self.assertIsNotNone(state.get_entry("pending_review_reuse_confirmations", 21)) async def test_review_flow_reuses_vehicle_with_date_only_and_requests_missing_time(self): state = FakeState( entries={ "pending_review_reuse_confirmations": { 21: { "payload": { "placa": "ABC1234", "modelo": "Onix", "ano": 2024, "km": 50000, "revisao_previa_concessionaria": False, }, "expires_at": utc_now() + timedelta(minutes=30), } } } ) registry = FakeRegistry() flow = ReviewFlowHarness(state=state, registry=registry) response = await flow._try_collect_and_schedule_review( message="Sim, quero marcar uma para o dia 18/08/2026", user_id=21, extracted_fields={"data_hora": "18/08/2026 00:00"}, intents={}, turn_decision={"intent": "review_schedule", "domain": "review", "action": "collect_review_schedule"}, ) draft = state.get_entry("pending_review_drafts", 21) self.assertIsNotNone(draft) self.assertNotIn("data_hora", draft["payload"]) self.assertEqual(draft["payload"].get("data_hora_base"), "18/08/2026") self.assertIn("Agora me informe o horario desejado", response) final_response = await flow._try_collect_and_schedule_review( message="as 10 horas", user_id=21, extracted_fields={}, intents={}, turn_decision={"intent": "review_schedule", "domain": "review", "action": "collect_review_schedule"}, ) self.assertEqual(registry.calls[0][0], "agendar_revisao") self.assertEqual(registry.calls[0][1]["data_hora"], "18/08/2026 10:00") self.assertIn("REV-TESTE-123", final_response) async def test_review_flow_reuses_vehicle_with_relative_date_only_and_requests_missing_time(self): state = FakeState( entries={ "pending_review_reuse_confirmations": { 21: { "payload": { "placa": "ABC1263", "modelo": "Onix", "ano": 2024, "km": 50000, "revisao_previa_concessionaria": False, }, "expires_at": utc_now() + timedelta(minutes=30), } } } ) registry = FakeRegistry() flow = ReviewFlowHarness(state=state, registry=registry) today_text = datetime.now().strftime("%d/%m/%Y") response = await flow._try_collect_and_schedule_review( message="eu gostaria de marcar para hoje", user_id=21, extracted_fields={}, intents={}, turn_decision={"intent": "review_schedule", "domain": "review", "action": "collect_review_schedule"}, ) draft = state.get_entry("pending_review_drafts", 21) self.assertIsNotNone(draft) self.assertEqual(draft["payload"].get("data_hora_base"), today_text) self.assertIn("Agora me informe o horario desejado", response) final_response = await flow._try_collect_and_schedule_review( message="as 14 horas", user_id=21, extracted_fields={}, intents={}, turn_decision={"intent": "review_schedule", "domain": "review", "action": "collect_review_schedule"}, ) self.assertEqual(registry.calls[0][0], "agendar_revisao") self.assertEqual(registry.calls[0][1]["data_hora"], f"{today_text} 14:00") self.assertIn("REV-TESTE-123", final_response) async def test_review_flow_reuse_confirmation_accepts_relative_date_then_time_in_follow_up(self): state = FakeState( entries={ "pending_review_reuse_confirmations": { 21: { "payload": { "placa": "ABC1263", "modelo": "Onix", "ano": 2024, "km": 50000, "revisao_previa_concessionaria": False, }, "expires_at": utc_now() + timedelta(minutes=30), } } } ) registry = FakeRegistry() flow = ReviewFlowHarness(state=state, registry=registry) today_text = datetime.now().strftime("%d/%m/%Y") first_response = await flow._try_collect_and_schedule_review( message="sim", user_id=21, extracted_fields={}, intents={}, turn_decision={"intent": "review_schedule", "domain": "review", "action": "collect_review_schedule"}, ) self.assertIn("Me informe apenas a data e hora desejada", first_response) second_response = await flow._try_collect_and_schedule_review( message="quero marcar para hoje", user_id=21, extracted_fields={}, intents={}, turn_decision={"intent": "review_schedule", "domain": "review", "action": "collect_review_schedule"}, ) draft = state.get_entry("pending_review_drafts", 21) self.assertIsNotNone(draft) self.assertEqual(draft["payload"].get("data_hora_base"), today_text) self.assertIn("Agora me informe o horario desejado", second_response) final_response = await flow._try_collect_and_schedule_review( message="as 14 horas", user_id=21, extracted_fields={}, intents={}, turn_decision={"intent": "review_schedule", "domain": "review", "action": "collect_review_schedule"}, ) self.assertEqual(registry.calls[0][0], "agendar_revisao") self.assertEqual(registry.calls[0][1]["data_hora"], f"{today_text} 14:00") self.assertIn("REV-TESTE-123", final_response) async def test_review_flow_clears_stale_pending_confirmation_when_user_starts_new_schedule(self): state = FakeState( entries={ "pending_review_confirmations": { 21: { "payload": { "placa": "ABC9999", "data_hora": "14/03/2026 10:00", "modelo": "Corolla", "ano": 2020, "km": 30000, "revisao_previa_concessionaria": True, }, "expires_at": utc_now() + timedelta(minutes=30), } } } ) registry = FakeRegistry() flow = ReviewFlowHarness(state=state, registry=registry) response = await flow._try_collect_and_schedule_review( message="quero agendar uma revisao", user_id=21, extracted_fields={}, intents={}, turn_decision={"intent": "review_schedule", "domain": "review", "action": "collect_review_schedule"}, ) self.assertIsNone(state.get_entry("pending_review_confirmations", 21)) self.assertIsNotNone(state.get_entry("pending_review_drafts", 21)) self.assertIn("a placa do veiculo", response) async def test_review_flow_keeps_draft_and_clears_data_hora_on_retryable_error(self): state = FakeState( entries={ "pending_review_drafts": { 21: { "payload": { "placa": "ABC1234", "data_hora": "2026-03-10T09:00:00-03:00", "modelo": "HB20", "ano": 2022, "km": 15000, "revisao_previa_concessionaria": True, }, "expires_at": utc_now() + timedelta(minutes=30), } } } ) registry = FakeRegistry() registry.raise_http_exception = HTTPException( status_code=409, detail={ "code": "review_schedule_conflict", "message": "O horario solicitado esta ocupado.", "retryable": True, "field": "data_hora", "suggested_iso": "2026-03-10T09:30:00-03:00", }, ) flow = ReviewFlowHarness(state=state, registry=registry) response = await flow._try_collect_and_schedule_review( message="agendar revisao", user_id=21, extracted_fields={}, intents={}, turn_decision={"intent": "review_schedule", "domain": "review", "action": "call_tool"}, ) draft = state.get_entry("pending_review_drafts", 21) self.assertIn("ocupado", response) self.assertIsNotNone(draft) self.assertEqual(draft["payload"].get("placa"), "ABC1234") self.assertNotIn("data_hora", draft["payload"]) async def test_review_management_preserves_reason_from_single_cancel_message(self): state = FakeState() registry = FakeRegistry() flow = ReviewFlowHarness(state=state, registry=registry) response = await flow._try_handle_review_management( message="Quero cancelar a revisao do protocolo REV-20260313-F754AF27 porque nao vou poder ir", user_id=21, extracted_fields={}, intents={}, turn_decision={"intent": "review_cancel", "domain": "review", "action": "answer_user"}, ) self.assertEqual(registry.calls[0][0], "cancelar_agendamento_revisao") self.assertEqual(registry.calls[0][1]["protocolo"], "REV-20260313-F754AF27") self.assertEqual(registry.calls[0][1]["motivo"], "nao vou poder ir") self.assertIn("cancelar_agendamento_revisao", response) async def test_review_management_infers_cancel_intent_from_protocol_message(self): state = FakeState() registry = FakeRegistry() flow = ReviewFlowHarness(state=state, registry=registry) response = await flow._try_handle_review_management( message="eu gostaria de cancelar o meu agendamento REV-20260313-F754AF27", user_id=21, extracted_fields={}, intents={}, turn_decision={"intent": "review_schedule", "domain": "review", "action": "answer_user"}, ) self.assertEqual(registry.calls[0][0], "cancelar_agendamento_revisao") self.assertEqual(registry.calls[0][1]["protocolo"], "REV-20260313-F754AF27") self.assertIn("cancelar_agendamento_revisao", response) self.assertIn("REV-20260313-F754AF27", response) async def test_review_management_reschedule_consumes_relative_datetime_follow_up(self): fixed_now = lambda: datetime(2026, 3, 12, 9, 0) state = FakeState( entries={ "pending_review_management_drafts": { 21: { "action": "reschedule", "payload": {"protocolo": "REV-20260313-F754AF27"}, "expires_at": utc_now() + timedelta(minutes=30), } } } ) registry = FakeRegistry() flow = ReviewFlowHarness(state=state, registry=registry, review_now_provider=fixed_now) response = await flow._try_handle_review_management( message="amanha 11h", user_id=21, extracted_fields={}, intents={}, turn_decision={"intent": "review_reschedule", "domain": "review", "action": "answer_user"}, ) self.assertIsNone(state.get_entry("pending_review_management_drafts", 21)) self.assertEqual(registry.calls[0][0], "editar_data_revisao") self.assertEqual(registry.calls[0][1]["protocolo"], "REV-20260313-F754AF27") self.assertEqual(registry.calls[0][1]["nova_data_hora"], "13/03/2026 11:00") self.assertIn("13/03/2026 11:00", response) async def test_review_management_reschedule_consumes_day_after_tomorrow_relative_datetime_follow_up(self): fixed_now = lambda: datetime(2026, 3, 12, 9, 0) state = FakeState( entries={ "pending_review_management_drafts": { 21: { "action": "reschedule", "payload": {"protocolo": "REV-20260313-F754AF27"}, "expires_at": utc_now() + timedelta(minutes=30), } } } ) registry = FakeRegistry() flow = ReviewFlowHarness(state=state, registry=registry, review_now_provider=fixed_now) response = await flow._try_handle_review_management( message="depois de amanha 11h", user_id=21, extracted_fields={}, intents={}, turn_decision={"intent": "review_reschedule", "domain": "review", "action": "answer_user"}, ) self.assertIsNone(state.get_entry("pending_review_management_drafts", 21)) self.assertEqual(registry.calls[0][0], "editar_data_revisao") self.assertEqual(registry.calls[0][1]["protocolo"], "REV-20260313-F754AF27") self.assertEqual(registry.calls[0][1]["nova_data_hora"], "14/03/2026 11:00") self.assertIn("14/03/2026 11:00", response) async def test_review_management_reschedule_date_only_then_time_follow_up(self): fixed_now = lambda: datetime(2026, 3, 12, 9, 0) state = FakeState( entries={ "pending_review_management_drafts": { 21: { "action": "reschedule", "payload": {"protocolo": "REV-20260313-F754AF27"}, "expires_at": utc_now() + timedelta(minutes=30), } } } ) registry = FakeRegistry() flow = ReviewFlowHarness(state=state, registry=registry, review_now_provider=fixed_now) first_response = await flow._try_handle_review_management( message="amanha", user_id=21, extracted_fields={}, intents={}, turn_decision={"intent": "review_reschedule", "domain": "review", "action": "answer_user"}, ) draft = state.get_entry("pending_review_management_drafts", 21) self.assertIsNotNone(draft) self.assertEqual(draft["payload"].get("nova_data_hora_base"), "13/03/2026") self.assertIn("Perfeito. Tenho a data 13/03/2026.", first_response) second_response = await flow._try_handle_review_management( message="11h", user_id=21, extracted_fields={}, intents={}, turn_decision={"intent": "review_reschedule", "domain": "review", "action": "answer_user"}, ) self.assertIsNone(state.get_entry("pending_review_management_drafts", 21)) self.assertEqual(registry.calls[0][0], "editar_data_revisao") self.assertEqual(registry.calls[0][1]["nova_data_hora"], "13/03/2026 11:00") self.assertIn("13/03/2026 11:00", second_response) async def test_review_management_reschedule_conflict_stores_pending_confirmation_suggestion(self): fixed_now = lambda: datetime(2026, 3, 12, 9, 0) state = FakeState( entries={ "pending_review_management_drafts": { 21: { "action": "reschedule", "payload": {"protocolo": "REV-20260313-F754AF27"}, "expires_at": utc_now() + timedelta(minutes=30), } } } ) registry = FakeRegistry() registry.raise_http_exception = HTTPException( status_code=409, detail={ "code": "review_reschedule_conflict", "message": "O horario 14/03/2026 as 11:00 ja esta ocupado. Posso agendar em 14/03/2026 as 12:00.", "retryable": True, "field": "nova_data_hora", "suggested_iso": "2026-03-14T12:00:00-03:00", }, ) flow = ReviewFlowHarness(state=state, registry=registry, review_now_provider=fixed_now) response = await flow._try_handle_review_management( message="depois de amanha 11h", user_id=21, extracted_fields={}, intents={}, turn_decision={"intent": "review_reschedule", "domain": "review", "action": "answer_user"}, ) draft = state.get_entry("pending_review_management_drafts", 21) self.assertIsNotNone(draft) self.assertNotIn("nova_data_hora", draft["payload"]) self.assertEqual(len(flow.captured_suggestions), 1) suggestion = flow.captured_suggestions[0] self.assertEqual(suggestion["tool_name"], "editar_data_revisao") self.assertEqual(suggestion["arguments"]["protocolo"], "REV-20260313-F754AF27") self.assertEqual(suggestion["arguments"]["nova_data_hora"], "14/03/2026 11:00") self.assertIn("ocupado", response) async def test_review_management_infers_listing_intent_from_agendamentos_message(self): state = FakeState() registry = FakeRegistry() flow = ReviewFlowHarness(state=state, registry=registry) response = await flow._try_handle_review_management( message="liste para mim os meus agendamentos de revisao", user_id=21, extracted_fields={}, intents={}, turn_decision={"intent": "general", "domain": "general", "action": "answer_user"}, ) self.assertEqual(registry.calls[0][0], "listar_agendamentos_revisao") self.assertEqual(registry.calls[0][1]["limite"], 100) self.assertIn("listar_agendamentos_revisao", response) async def test_review_schedule_clears_open_management_draft(self): state = FakeState( entries={ "pending_review_management_drafts": { 21: { "action": "reschedule", "payload": {"protocolo": "REV-20260313-F754AF27"}, "expires_at": utc_now() + timedelta(minutes=30), } } } ) registry = FakeRegistry() flow = ReviewFlowHarness(state=state, registry=registry) response = await flow._try_handle_review_management( message="quero agendar uma revisao", user_id=21, extracted_fields={}, intents={}, turn_decision={"intent": "review_schedule", "domain": "review", "action": "answer_user"}, ) self.assertIsNone(response) self.assertIsNone(state.get_entry("pending_review_management_drafts", 21)) async def test_review_management_does_not_override_open_schedule_draft_without_protocol(self): state = FakeState( entries={ "pending_review_drafts": { 21: { "payload": { "placa": "ABC1234", "modelo": "Corolla", "ano": 2020, "km": 30000, "revisao_previa_concessionaria": True, }, "expires_at": utc_now() + timedelta(minutes=30), } } } ) registry = FakeRegistry() flow = ReviewFlowHarness(state=state, registry=registry) response = await flow._try_handle_review_management( message="pode ser hoje as 17:30", user_id=21, extracted_fields={}, intents={}, turn_decision={"intent": "review_reschedule", "domain": "review", "action": "answer_user"}, ) self.assertIsNone(response) self.assertIsNone(state.get_entry("pending_review_management_drafts", 21)) async def test_review_schedule_flow_ignores_management_message_with_protocol(self): state = FakeState( contexts={ 21: { "active_domain": "review", "generic_memory": {}, "shared_memory": {}, "order_queue": [], "pending_order_selection": None, "pending_switch": None, "last_stock_results": [], "selected_vehicle": None, } } ) registry = FakeRegistry() flow = ReviewFlowHarness(state=state, registry=registry) response = await flow._try_collect_and_schedule_review( message="eu gostaria de cancelar o meu agendamento REV-20260313-F754AF27", user_id=21, extracted_fields={}, intents={}, turn_decision={"intent": "review_schedule", "domain": "review", "action": "answer_user"}, ) self.assertIsNone(response) self.assertEqual(registry.calls, []) async def test_review_flow_does_not_bootstrap_sales_message_from_active_review_context(self): state = FakeState( contexts={ 21: { "active_domain": "review", "generic_memory": {}, "shared_memory": {}, "order_queue": [], "pending_order_selection": None, "pending_switch": None, "last_stock_results": [], "selected_vehicle": None, } } ) registry = FakeRegistry() flow = ReviewFlowHarness(state=state, registry=registry) response = await flow._try_collect_and_schedule_review( message="quero comprar um carro de ate 70 mil", user_id=21, extracted_fields={}, intents={}, turn_decision={"intent": "general", "domain": "general", "action": "answer_user"}, ) self.assertIsNone(response) self.assertIsNone(state.get_entry("pending_review_drafts", 21)) class ContextSwitchPolicyTests(unittest.TestCase): def test_handle_context_switch_ignores_ambiguous_confirmation_while_review_reuse_is_pending(self): state = FakeState( entries={ "pending_review_reuse_confirmations": { 9: { "payload": { "placa": "ABC1463", "modelo": "Civic", "ano": 2024, "km": 30000, "revisao_previa_concessionaria": False, }, "expires_at": utc_now() + timedelta(minutes=15), } } }, contexts={ 9: { "pending_switch": None, "active_domain": "review", "generic_memory": {}, "pending_order_selection": None, } }, ) service = FakeService(state) policy = ConversationPolicy(service=service) response = policy.handle_context_switch( message="sim", user_id=9, target_domain_hint="sales", turn_decision={"domain": "sales", "intent": "order_create", "action": "answer_user"}, ) self.assertIsNone(response) self.assertIsNone(service._get_user_context(9).get("pending_switch")) def test_handle_context_switch_still_confirms_explicit_domain_change_with_open_review_flow(self): state = FakeState( entries={ "pending_review_reuse_confirmations": { 9: { "payload": { "placa": "ABC1463", "modelo": "Civic", "ano": 2024, "km": 30000, "revisao_previa_concessionaria": False, }, "expires_at": utc_now() + timedelta(minutes=15), } } }, contexts={ 9: { "pending_switch": None, "active_domain": "review", "generic_memory": {}, "pending_order_selection": None, } }, ) service = FakeService(state) policy = ConversationPolicy(service=service) response = policy.handle_context_switch( message="agora quero comprar um carro", user_id=9, target_domain_hint="sales", turn_decision={"domain": "sales", "intent": "order_create", "action": "answer_user"}, ) self.assertEqual( response, "Entendi que voce quer sair de agendamento de revisao e ir para compra de veiculo. Tem certeza?", ) self.assertIsNotNone(service._get_user_context(9).get("pending_switch")) def test_render_context_switched_message_guides_next_step_for_sales(self): state = FakeState() service = FakeService(state) policy = ConversationPolicy(service=service) response = policy.render_context_switched_message("sales") self.assertEqual( response, "Certo, contexto anterior encerrado. Vamos seguir com compra de veiculo.\n" "Pode me dizer a faixa de preco, o modelo ou o tipo de carro que voce procura.", ) def test_handle_context_switch_drops_stale_pending_switch_when_user_starts_other_domain(self): state = FakeState( contexts={ 9: { "pending_switch": { "target_domain": "sales", "expires_at": utc_now() + timedelta(minutes=15), }, "active_domain": "general", "generic_memory": {}, "pending_order_selection": None, } } ) service = FakeService(state) policy = ConversationPolicy(service=service) response = policy.handle_context_switch( message="quero agendar revisao", user_id=9, target_domain_hint="review", turn_decision={"domain": "review", "intent": "review_schedule", "action": "collect_review_schedule"}, ) self.assertIsNone(response) self.assertIsNone(service._get_user_context(9).get("pending_switch")) class ToolRegistryExecutionTests(unittest.IsolatedAsyncioTestCase): async def test_execute_ignores_extra_arguments_for_review_listing_tool(self): async def fake_listar_agendamentos_revisao( user_id: int | None = None, placa: str | None = None, status: str | None = None, limite: int | None = 20, ): return [{"user_id": user_id, "placa": placa, "status": status, "limite": limite}] registry = ToolRegistry.__new__(ToolRegistry) registry._tools = [ ToolDefinition( name="listar_agendamentos_revisao", description="", parameters={}, handler=fake_listar_agendamentos_revisao, ) ] result = await registry.execute( "listar_agendamentos_revisao", {"placa": "ABC1234", "status": "agendado", "limite": 10, "tipo": "revisao"}, user_id=21, ) self.assertEqual( result, [{"user_id": 21, "placa": "ABC1234", "status": "agendado", "limite": 10}], ) if __name__ == "__main__": unittest.main()