You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
orquestrador/tests/test_conversation_adjustmen...

855 lines
34 KiB
Python

import os
import unittest
from datetime import datetime, timedelta
from unittest.mock import patch
os.environ.setdefault("DEBUG", "false")
from fastapi import HTTPException
from app.services.flows.order_flow import OrderFlowMixin
from app.services.flows.review_flow import ReviewFlowMixin
from app.integrations.telegram_satellite_service import _ensure_supported_runtime_configuration
from app.services.orchestration.conversation_policy import ConversationPolicy
from app.services.orchestration.entity_normalizer import EntityNormalizer
from app.services.tools.handlers import _parse_data_hora_revisao
class FakeState:
def __init__(self, entries=None, contexts=None):
self.entries = entries or {}
self.contexts = contexts or {}
def get_entry(self, bucket: str, user_id: int | None, *, expire: bool = False):
if user_id is None:
return None
return self.entries.get(bucket, {}).get(user_id)
def set_entry(self, bucket: str, user_id: int | None, value: dict):
if user_id is None:
return
self.entries.setdefault(bucket, {})[user_id] = value
def pop_entry(self, bucket: str, user_id: int | None):
if user_id is None:
return None
return self.entries.get(bucket, {}).pop(user_id, None)
def get_user_context(self, user_id: int | None):
if user_id is None:
return None
return self.contexts.get(user_id)
class FakeService:
def __init__(self, state):
self.state = state
self.normalizer = EntityNormalizer()
def _is_affirmative_message(self, text: str) -> bool:
normalized = self.normalizer.normalize_text(text).strip().rstrip(".!?,;:")
return normalized in {"sim", "pode", "ok", "confirmo", "aceito", "fechado", "pode sim", "tenho", "tenho sim"}
def _get_user_context(self, user_id: int | None):
return self.state.get_user_context(user_id)
class FakeRegistry:
def __init__(self):
self.calls = []
self.raise_http_exception = None
async def execute(self, tool_name: str, arguments: dict, user_id: int | None = None):
self.calls.append((tool_name, arguments, user_id))
if self.raise_http_exception is not None:
raise self.raise_http_exception
if tool_name == "consultar_estoque":
return [
{"id": 1, "modelo": "Honda Civic 2021", "categoria": "sedan", "preco": 48500.0},
{"id": 2, "modelo": "Toyota Yaris 2020", "categoria": "hatch", "preco": 49900.0},
]
if tool_name == "realizar_pedido":
vehicle_map = {
1: ("Honda Civic 2021", 51524.0),
2: ("Toyota Corolla 2020", 58476.0),
3: ("Chevrolet Onix 2022", 51809.0),
7: ("Fiat Argo 2020", 61857.0),
}
modelo_veiculo, valor_veiculo = vehicle_map[arguments["vehicle_id"]]
return {
"numero_pedido": "PED-TESTE-123",
"status": "Ativo",
"modelo_veiculo": modelo_veiculo,
"valor_veiculo": valor_veiculo,
}
return {
"numero_pedido": arguments["numero_pedido"],
"status": "Cancelado",
"motivo": arguments["motivo"],
}
def coerce_http_error(self, exc):
detail = exc.detail if isinstance(exc.detail, dict) else {"message": str(exc.detail)}
return {
"code": detail.get("code", "tool_error"),
"message": detail.get("message", str(exc.detail)),
"retryable": bool(detail.get("retryable", False)),
"field": detail.get("field"),
}
class OrderFlowHarness(OrderFlowMixin):
def __init__(self, state, registry):
self.state = state
self.registry = registry
self.tool_executor = registry
self.normalizer = EntityNormalizer()
def _get_user_context(self, user_id: int | None):
return self.state.get_user_context(user_id)
def _normalize_intents(self, data) -> dict:
return self.normalizer.normalize_intents(data)
def _normalize_cancel_order_fields(self, data) -> dict:
return self.normalizer.normalize_cancel_order_fields(data)
def _normalize_order_fields(self, data) -> dict:
return self.normalizer.normalize_order_fields(data)
def _normalize_text(self, text: str) -> str:
return self.normalizer.normalize_text(text)
def _normalize_positive_number(self, value):
return self.normalizer.normalize_positive_number(value)
def _http_exception_detail(self, exc) -> str:
return str(exc)
def _fallback_format_tool_result(self, tool_name: str, tool_result) -> str:
if tool_name == "consultar_estoque":
lines = [f"Encontrei {len(tool_result)} veiculo(s):"]
for idx, item in enumerate(tool_result, start=1):
lines.append(f"{idx}. {item['modelo']} ({item['categoria']}) - R$ {item['preco']:.2f}")
lines.append("Para escolher, responda com o numero da opcao desejada. Exemplo: 1.")
return "\n".join(lines)
if tool_name == "realizar_pedido":
return (
f"Pedido criado com sucesso.\n"
f"Numero: {tool_result['numero_pedido']}\n"
f"Veiculo: {tool_result['modelo_veiculo']}\n"
f"Valor: R$ {tool_result['valor_veiculo']:.2f}"
)
return (
f"Pedido {tool_result['numero_pedido']} atualizado.\n"
f"Status: {tool_result['status']}\n"
f"Motivo: {tool_result['motivo']}"
)
def _try_prefill_order_cpf_from_user_profile(self, user_id: int | None, payload: dict) -> None:
return None
def _load_vehicle_by_id(self, vehicle_id: int) -> dict | None:
for context in self.state.contexts.values():
for item in context.get("last_stock_results", []):
if int(item["id"]) == int(vehicle_id):
return dict(item)
return None
class ReviewFlowHarness(ReviewFlowMixin):
def __init__(self, state, registry):
self.state = state
self.registry = registry
self.tool_executor = registry
self.normalizer = EntityNormalizer()
self.captured_suggestions = []
def _normalize_intents(self, data) -> dict:
return self.normalizer.normalize_intents(data)
def _normalize_review_fields(self, data) -> dict:
return self.normalizer.normalize_review_fields(data)
def _normalize_review_management_fields(self, data) -> dict:
return self.normalizer.normalize_review_management_fields(data)
def _normalize_text(self, text: str) -> str:
return self.normalizer.normalize_text(text)
def _http_exception_detail(self, exc) -> str:
detail = exc.detail if isinstance(exc.detail, dict) else {}
return str(detail.get("message") or exc)
def _fallback_format_tool_result(self, tool_name: str, tool_result) -> str:
return f"{tool_name}:{tool_result}"
def _extract_review_protocol_from_text(self, text: str) -> str | None:
return self.normalizer.extract_review_protocol_from_text(text)
def _is_affirmative_message(self, text: str) -> bool:
normalized = self.normalizer.normalize_text(text).strip().rstrip(".!?,;:")
return normalized in {"sim", "pode", "ok", "confirmo", "aceito", "fechado", "pode sim", "tenho", "tenho sim"}
def _is_negative_message(self, text: str) -> bool:
normalized = self.normalizer.normalize_text(text).strip().rstrip(".!?,;:")
return normalized in {"nao", "nao quero", "prefiro outro", "outro horario"} or normalized.startswith("nao")
def _capture_review_confirmation_suggestion(self, **kwargs) -> None:
self.captured_suggestions.append(kwargs)
def _try_prefill_review_fields_from_memory(self, user_id: int | None, payload: dict) -> None:
return None
class ConversationAdjustmentsTests(unittest.TestCase):
def test_telegram_satellite_requires_redis_in_production(self):
with patch("app.integrations.telegram_satellite_service.settings.environment", "production"), patch(
"app.integrations.telegram_satellite_service.settings.conversation_state_backend",
"memory",
):
with self.assertRaises(RuntimeError):
_ensure_supported_runtime_configuration()
def test_telegram_satellite_allows_redis_in_production(self):
with patch("app.integrations.telegram_satellite_service.settings.environment", "production"), patch(
"app.integrations.telegram_satellite_service.settings.conversation_state_backend",
"redis",
):
_ensure_supported_runtime_configuration()
def test_defer_flow_cancel_when_order_cancel_draft_waits_for_reason(self):
state = FakeState(
entries={
"pending_cancel_order_drafts": {
7: {
"payload": {"numero_pedido": "PED-20260305120000-ABC123"},
"expires_at": datetime.utcnow() + timedelta(minutes=30),
}
}
}
)
policy = ConversationPolicy(service=FakeService(state))
self.assertTrue(policy.should_defer_flow_cancellation_control("desisti", user_id=7))
self.assertFalse(policy.should_defer_flow_cancellation_control("cancelar fluxo atual", user_id=7))
def test_normalize_datetime_connector_accepts_as_com_acento(self):
normalizer = EntityNormalizer()
self.assertEqual(
normalizer.normalize_datetime_connector("10/03/2026 às 09:00"),
"10/03/2026 09:00",
)
def test_parse_review_datetime_accepts_as_com_acento(self):
parsed = _parse_data_hora_revisao("10/03/2026 às 09:00")
self.assertEqual(parsed, datetime(2026, 3, 10, 9, 0))
class CancelOrderFlowTests(unittest.IsolatedAsyncioTestCase):
async def test_cancel_order_flow_accepts_turn_decision_without_legacy_intents(self):
state = FakeState()
registry = FakeRegistry()
flow = OrderFlowHarness(state=state, registry=registry)
response = await flow._try_collect_and_cancel_order(
message="cancelar pedido PED-20260305120000-ABC123",
user_id=42,
extracted_fields={"numero_pedido": "PED-20260305120000-ABC123"},
intents={},
turn_decision={"intent": "order_cancel", "domain": "sales", "action": "collect_order_cancel"},
)
self.assertIn("o motivo do cancelamento", response)
self.assertIsNotNone(state.get_entry("pending_cancel_order_drafts", 42))
async def test_cancel_order_flow_consumes_free_text_reason(self):
state = FakeState(
entries={
"pending_cancel_order_drafts": {
42: {
"payload": {"numero_pedido": "PED-20260305120000-ABC123"},
"expires_at": datetime.utcnow() + timedelta(minutes=30),
}
}
}
)
registry = FakeRegistry()
flow = OrderFlowHarness(state=state, registry=registry)
response = await flow._try_collect_and_cancel_order(
message="desisti",
user_id=42,
extracted_fields={},
intents={},
)
self.assertEqual(len(registry.calls), 1)
tool_name, arguments, tool_user_id = registry.calls[0]
self.assertEqual(tool_name, "cancelar_pedido")
self.assertEqual(tool_user_id, 42)
self.assertEqual(arguments["numero_pedido"], "PED-20260305120000-ABC123")
self.assertEqual(arguments["motivo"], "desisti")
self.assertIn("Status: Cancelado", response)
self.assertIsNone(state.get_entry("pending_cancel_order_drafts", 42))
async def test_cancel_order_flow_still_requests_reason_when_message_is_too_short(self):
state = FakeState(
entries={
"pending_cancel_order_drafts": {
42: {
"payload": {"numero_pedido": "PED-20260305120000-ABC123"},
"expires_at": datetime.utcnow() + timedelta(minutes=30),
}
}
}
)
registry = FakeRegistry()
flow = OrderFlowHarness(state=state, registry=registry)
response = await flow._try_collect_and_cancel_order(
message="ok",
user_id=42,
extracted_fields={},
intents={},
)
self.assertEqual(registry.calls, [])
self.assertIn("o motivo do cancelamento", response)
self.assertIsNotNone(state.get_entry("pending_cancel_order_drafts", 42))
async def test_cancel_order_flow_does_not_override_active_order_creation_draft(self):
state = FakeState(
entries={
"pending_order_drafts": {
42: {
"payload": {},
"expires_at": datetime.utcnow() + timedelta(minutes=30),
}
}
}
)
registry = FakeRegistry()
flow = OrderFlowHarness(state=state, registry=registry)
response = await flow._try_collect_and_cancel_order(
message="2",
user_id=42,
extracted_fields={},
intents={"order_cancel": True},
)
self.assertIsNone(response)
self.assertEqual(registry.calls, [])
class CreateOrderFlowWithVehicleTests(unittest.IsolatedAsyncioTestCase):
async def test_order_flow_auto_lists_stock_on_first_purchase_message_when_budget_exists(self):
state = FakeState(
contexts={
10: {
"generic_memory": {"cpf": "12345678909", "orcamento_max": 50000},
"last_stock_results": [],
"selected_vehicle": None,
}
}
)
registry = FakeRegistry()
flow = OrderFlowHarness(state=state, registry=registry)
async def fake_hydrate_mock_customer_from_cpf(cpf: str, user_id: int | None = None):
return {"cpf": cpf, "user_id": user_id}
with patch(
"app.services.flows.order_flow.hydrate_mock_customer_from_cpf",
new=fake_hydrate_mock_customer_from_cpf,
):
response = await flow._try_collect_and_create_order(
message="Quero comprar um carro de 50 mil, meu CPF e 12345678909",
user_id=10,
extracted_fields={"cpf": "12345678909"},
intents={},
turn_decision={"intent": "order_create", "domain": "sales", "action": "collect_order_create"},
)
self.assertIn("Encontrei 2 veiculo(s):", response)
self.assertIn("Honda Civic 2021", response)
async def test_order_flow_extracts_budget_from_message_when_llm_misses_it(self):
state = FakeState(
contexts={
10: {
"generic_memory": {},
"shared_memory": {},
"last_stock_results": [],
"selected_vehicle": None,
}
}
)
registry = FakeRegistry()
flow = OrderFlowHarness(state=state, registry=registry)
async def fake_hydrate_mock_customer_from_cpf(cpf: str, user_id: int | None = None):
return {"cpf": cpf, "user_id": user_id}
with patch(
"app.services.flows.order_flow.hydrate_mock_customer_from_cpf",
new=fake_hydrate_mock_customer_from_cpf,
):
response = await flow._try_collect_and_create_order(
message="Quero comprar um carro de 50 mil, meu CPF e 12345678909",
user_id=10,
extracted_fields={},
intents={},
turn_decision={"intent": "order_create", "domain": "sales", "action": "collect_order_create"},
)
self.assertIn("Encontrei 2 veiculo(s):", response)
self.assertEqual(state.get_user_context(10)["generic_memory"]["orcamento_max"], 50000)
async def test_order_flow_extracts_cpf_from_followup_message_when_llm_misses_it(self):
state = FakeState(
entries={
"pending_order_drafts": {
10: {
"payload": {},
"expires_at": datetime.utcnow() + timedelta(minutes=30),
}
}
},
contexts={
10: {
"generic_memory": {"orcamento_max": 50000},
"shared_memory": {"orcamento_max": 50000},
"last_stock_results": [],
"selected_vehicle": None,
}
},
)
registry = FakeRegistry()
flow = OrderFlowHarness(state=state, registry=registry)
async def fake_hydrate_mock_customer_from_cpf(cpf: str, user_id: int | None = None):
return {"cpf": cpf, "user_id": user_id}
with patch(
"app.services.flows.order_flow.hydrate_mock_customer_from_cpf",
new=fake_hydrate_mock_customer_from_cpf,
):
response = await flow._try_collect_and_create_order(
message="Meu CPF e 12345678909",
user_id=10,
extracted_fields={},
intents={},
)
self.assertIn("Encontrei 2 veiculo(s):", response)
self.assertEqual(registry.calls[0][0], "consultar_estoque")
async def test_order_flow_lists_stock_from_budget_when_vehicle_is_missing(self):
state = FakeState(
entries={
"pending_order_drafts": {
10: {
"payload": {"cpf": "12345678909"},
"expires_at": datetime.utcnow() + timedelta(minutes=30),
}
}
},
contexts={
10: {
"generic_memory": {"cpf": "12345678909", "orcamento_max": 50000},
"last_stock_results": [],
"selected_vehicle": None,
}
},
)
registry = FakeRegistry()
flow = OrderFlowHarness(state=state, registry=registry)
async def fake_hydrate_mock_customer_from_cpf(cpf: str, user_id: int | None = None):
return {"cpf": cpf, "user_id": user_id}
with patch(
"app.services.flows.order_flow.hydrate_mock_customer_from_cpf",
new=fake_hydrate_mock_customer_from_cpf,
):
response = await flow._try_collect_and_create_order(
message="liste os carros com esse valor em estoque",
user_id=10,
extracted_fields={},
intents={},
turn_decision={"intent": "inventory_search", "domain": "sales", "action": "call_tool"},
)
self.assertEqual(registry.calls[0][0], "consultar_estoque")
self.assertIn("Encontrei 2 veiculo(s):", response)
self.assertIn("Honda Civic 2021", response)
self.assertEqual(len(flow._get_last_stock_results(user_id=10)), 2)
async def test_order_flow_accepts_turn_decision_without_legacy_intents(self):
state = FakeState(
contexts={
10: {
"generic_memory": {"cpf": "12345678909"},
"last_stock_results": [
{"id": 1, "modelo": "Honda Civic 2021", "categoria": "sedan", "preco": 51524.0},
],
"selected_vehicle": None,
}
}
)
registry = FakeRegistry()
flow = OrderFlowHarness(state=state, registry=registry)
async def fake_hydrate_mock_customer_from_cpf(cpf: str, user_id: int | None = None):
return {"cpf": cpf, "user_id": user_id}
with patch(
"app.services.flows.order_flow.hydrate_mock_customer_from_cpf",
new=fake_hydrate_mock_customer_from_cpf,
):
response = await flow._try_collect_and_create_order(
message="esse",
user_id=10,
extracted_fields={"vehicle_id": 1},
intents={},
turn_decision={"intent": "order_create", "domain": "sales", "action": "collect_order_create"},
)
self.assertEqual(len(registry.calls), 1)
tool_name, arguments, tool_user_id = registry.calls[0]
self.assertEqual(tool_name, "realizar_pedido")
self.assertEqual(tool_user_id, 10)
self.assertEqual(arguments["vehicle_id"], 1)
self.assertEqual(arguments["cpf"], "12345678909")
self.assertIn("Pedido criado com sucesso.", response)
async def test_order_flow_accepts_model_intent_without_keyword_trigger(self):
state = FakeState(
contexts={
10: {
"generic_memory": {"cpf": "12345678909"},
"last_stock_results": [
{"id": 1, "modelo": "Honda Civic 2021", "categoria": "sedan", "preco": 51524.0},
],
"selected_vehicle": None,
}
}
)
registry = FakeRegistry()
flow = OrderFlowHarness(state=state, registry=registry)
async def fake_hydrate_mock_customer_from_cpf(cpf: str, user_id: int | None = None):
return {"cpf": cpf, "user_id": user_id}
with patch(
"app.services.flows.order_flow.hydrate_mock_customer_from_cpf",
new=fake_hydrate_mock_customer_from_cpf,
):
response = await flow._try_collect_and_create_order(
message="esse",
user_id=10,
extracted_fields={"vehicle_id": 1},
intents={"order_create": True},
)
self.assertEqual(len(registry.calls), 1)
tool_name, arguments, tool_user_id = registry.calls[0]
self.assertEqual(tool_name, "realizar_pedido")
self.assertEqual(tool_user_id, 10)
self.assertEqual(arguments["vehicle_id"], 1)
self.assertEqual(arguments["cpf"], "12345678909")
self.assertIn("Pedido criado com sucesso.", response)
async def test_order_flow_requests_vehicle_selection_from_last_stock_results(self):
state = FakeState(
contexts={
10: {
"generic_memory": {},
"last_stock_results": [
{"id": 1, "modelo": "Honda Civic 2021", "categoria": "sedan", "preco": 51524.0},
{"id": 2, "modelo": "Toyota Corolla 2020", "categoria": "hatch", "preco": 58476.0},
],
"selected_vehicle": None,
}
}
)
registry = FakeRegistry()
flow = OrderFlowHarness(state=state, registry=registry)
response = await flow._try_collect_and_create_order(
message="Quero fazer um pedido",
user_id=10,
extracted_fields={},
intents={"order_create": True},
)
self.assertIn("escolha primeiro qual veiculo", response.lower())
self.assertIn("Honda Civic 2021", response)
self.assertEqual(registry.calls, [])
async def test_order_flow_creates_order_with_selected_vehicle_from_list_index(self):
state = FakeState(
entries={
"pending_order_drafts": {
10: {
"payload": {"cpf": "12345678909"},
"expires_at": datetime.utcnow() + timedelta(minutes=30),
}
}
},
contexts={
10: {
"generic_memory": {"cpf": "12345678909"},
"last_stock_results": [
{"id": 1, "modelo": "Honda Civic 2021", "categoria": "sedan", "preco": 51524.0},
{"id": 2, "modelo": "Toyota Corolla 2020", "categoria": "hatch", "preco": 58476.0},
],
"selected_vehicle": None,
}
},
)
registry = FakeRegistry()
flow = OrderFlowHarness(state=state, registry=registry)
async def fake_hydrate_mock_customer_from_cpf(cpf: str, user_id: int | None = None):
return {"cpf": cpf, "user_id": user_id}
with patch(
"app.services.flows.order_flow.hydrate_mock_customer_from_cpf",
new=fake_hydrate_mock_customer_from_cpf,
):
response = await flow._try_collect_and_create_order(
message="2",
user_id=10,
extracted_fields={},
intents={},
)
self.assertEqual(len(registry.calls), 1)
tool_name, arguments, tool_user_id = registry.calls[0]
self.assertEqual(tool_name, "realizar_pedido")
self.assertEqual(tool_user_id, 10)
self.assertEqual(arguments["vehicle_id"], 2)
self.assertEqual(arguments["cpf"], "12345678909")
self.assertIn("Veiculo: Toyota Corolla 2020", response)
async def test_order_flow_selection_uses_list_position_not_vehicle_id(self):
state = FakeState(
entries={
"pending_order_drafts": {
10: {
"payload": {"cpf": "12345678909"},
"expires_at": datetime.utcnow() + timedelta(minutes=30),
}
}
},
contexts={
10: {
"generic_memory": {"cpf": "12345678909"},
"last_stock_results": [
{"id": 3, "modelo": "Chevrolet Onix 2022", "categoria": "suv", "preco": 51809.0},
{"id": 7, "modelo": "Fiat Argo 2020", "categoria": "suv", "preco": 61857.0},
],
"selected_vehicle": None,
}
},
)
registry = FakeRegistry()
flow = OrderFlowHarness(state=state, registry=registry)
async def fake_hydrate_mock_customer_from_cpf(cpf: str, user_id: int | None = None):
return {"cpf": cpf, "user_id": user_id}
with patch(
"app.services.flows.order_flow.hydrate_mock_customer_from_cpf",
new=fake_hydrate_mock_customer_from_cpf,
):
response = await flow._try_collect_and_create_order(
message="3",
user_id=10,
extracted_fields={},
intents={},
)
self.assertEqual(registry.calls, [])
self.assertIn("escolha primeiro qual veiculo", response.lower())
self.assertIn("1. Chevrolet Onix 2022", response)
self.assertIn("2. Fiat Argo 2020", response)
async def test_order_flow_keeps_draft_and_clears_retryable_field_on_tool_error(self):
state = FakeState(
entries={
"pending_order_drafts": {
10: {
"payload": {"cpf": "12345678909", "vehicle_id": 99},
"expires_at": datetime.utcnow() + timedelta(minutes=30),
}
}
},
contexts={
10: {
"generic_memory": {"cpf": "12345678909"},
"last_stock_results": [],
"selected_vehicle": None,
}
},
)
registry = FakeRegistry()
registry.raise_http_exception = HTTPException(
status_code=409,
detail={
"code": "vehicle_already_reserved",
"message": "Este veiculo ja esta reservado e nao aparece mais no estoque disponivel.",
"retryable": True,
"field": "vehicle_id",
},
)
flow = OrderFlowHarness(state=state, registry=registry)
async def fake_hydrate_mock_customer_from_cpf(cpf: str, user_id: int | None = None):
return {"cpf": cpf, "user_id": user_id}
with patch(
"app.services.flows.order_flow.hydrate_mock_customer_from_cpf",
new=fake_hydrate_mock_customer_from_cpf,
):
response = await flow._try_collect_and_create_order(
message="quero esse carro",
user_id=10,
extracted_fields={},
intents={},
)
draft = state.get_entry("pending_order_drafts", 10)
self.assertIn("ja esta reservado", response)
self.assertIsNotNone(draft)
self.assertEqual(draft["payload"].get("cpf"), "12345678909")
self.assertNotIn("vehicle_id", draft["payload"])
async def test_order_flow_refreshes_stale_stock_results_when_budget_changes(self):
state = FakeState(
contexts={
10: {
"generic_memory": {"cpf": "12345678909", "orcamento_max": 45000},
"last_stock_results": [
{"id": 3, "modelo": "Chevrolet Onix 2022", "categoria": "suv", "preco": 51809.0},
{"id": 7, "modelo": "Fiat Argo 2020", "categoria": "suv", "preco": 61857.0},
],
"selected_vehicle": {"id": 3, "modelo": "Chevrolet Onix 2022", "categoria": "suv", "preco": 51809.0},
}
}
)
registry = FakeRegistry()
flow = OrderFlowHarness(state=state, registry=registry)
async def fake_hydrate_mock_customer_from_cpf(cpf: str, user_id: int | None = None):
return {"cpf": cpf, "user_id": user_id}
with patch(
"app.services.flows.order_flow.hydrate_mock_customer_from_cpf",
new=fake_hydrate_mock_customer_from_cpf,
):
response = await flow._try_collect_and_create_order(
message="Quero comprar um carro de 45 mil, meu CPF e 12345678909",
user_id=10,
extracted_fields={"cpf": "12345678909"},
intents={},
turn_decision={"intent": "order_create", "domain": "sales", "action": "collect_order_create"},
)
self.assertEqual(registry.calls[0][0], "consultar_estoque")
self.assertNotIn("Chevrolet Onix 2022", response)
self.assertEqual(state.get_user_context(10)["selected_vehicle"], None)
self.assertEqual(len(state.get_user_context(10)["last_stock_results"]), 2)
async def test_order_flow_refreshes_stale_stock_results_when_profile_changes(self):
state = FakeState(
contexts={
10: {
"generic_memory": {"cpf": "12345678909", "orcamento_max": 50000, "perfil_veiculo": ["hatch"]},
"last_stock_results": [
{"id": 3, "modelo": "Chevrolet Onix 2022", "categoria": "suv", "preco": 48000.0},
],
"selected_vehicle": {"id": 3, "modelo": "Chevrolet Onix 2022", "categoria": "suv", "preco": 48000.0},
}
}
)
registry = FakeRegistry()
flow = OrderFlowHarness(state=state, registry=registry)
async def fake_hydrate_mock_customer_from_cpf(cpf: str, user_id: int | None = None):
return {"cpf": cpf, "user_id": user_id}
with patch(
"app.services.flows.order_flow.hydrate_mock_customer_from_cpf",
new=fake_hydrate_mock_customer_from_cpf,
):
response = await flow._try_collect_and_create_order(
message="Quero comprar um hatch de 50 mil, meu CPF e 12345678909",
user_id=10,
extracted_fields={"cpf": "12345678909"},
intents={},
turn_decision={"intent": "order_create", "domain": "sales", "action": "collect_order_create"},
)
self.assertEqual(registry.calls[0][0], "consultar_estoque")
self.assertEqual(state.get_user_context(10)["selected_vehicle"], None)
self.assertTrue(
all(item.get("categoria") != "suv" for item in state.get_user_context(10)["last_stock_results"])
)
class ReviewFlowDraftTests(unittest.IsolatedAsyncioTestCase):
async def test_review_flow_keeps_draft_and_clears_data_hora_on_retryable_error(self):
state = FakeState(
entries={
"pending_review_drafts": {
21: {
"payload": {
"placa": "ABC1234",
"data_hora": "2026-03-10T09:00:00-03:00",
"modelo": "HB20",
"ano": 2022,
"km": 15000,
"revisao_previa_concessionaria": True,
},
"expires_at": datetime.utcnow() + timedelta(minutes=30),
}
}
}
)
registry = FakeRegistry()
registry.raise_http_exception = HTTPException(
status_code=409,
detail={
"code": "review_schedule_conflict",
"message": "O horario solicitado esta ocupado.",
"retryable": True,
"field": "data_hora",
"suggested_iso": "2026-03-10T09:30:00-03:00",
},
)
flow = ReviewFlowHarness(state=state, registry=registry)
response = await flow._try_collect_and_schedule_review(
message="agendar revisao",
user_id=21,
extracted_fields={},
intents={},
turn_decision={"intent": "review_schedule", "domain": "review", "action": "call_tool"},
)
draft = state.get_entry("pending_review_drafts", 21)
self.assertIn("ocupado", response)
self.assertIsNotNone(draft)
self.assertEqual(draft["payload"].get("placa"), "ABC1234")
self.assertNotIn("data_hora", draft["payload"])
if __name__ == "__main__":
unittest.main()