You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
orquestrador/tests/test_turn_decision_contract.py

1226 lines
48 KiB
Python

import os
import unittest
os.environ.setdefault("DEBUG", "false")
from datetime import datetime, timedelta
from app.services.orchestration.conversation_policy import ConversationPolicy
from app.services.orchestration.entity_normalizer import EntityNormalizer
from app.services.orchestration.message_planner import MessagePlanner
from app.services.orchestration.orquestrador_service import OrquestradorService
class FakeLLM:
def __init__(self, responses):
self.responses = list(responses)
self.calls = 0
async def generate_response(self, message: str, tools):
self.calls += 1
if self.responses:
return self.responses.pop(0)
return {"response": "", "tool_call": None}
class FakeState:
def __init__(self, entries=None, contexts=None):
self.entries = entries or {}
self.contexts = contexts or {}
def get_entry(self, bucket: str, user_id: int | None, *, expire: bool = False):
if user_id is None:
return None
return self.entries.get(bucket, {}).get(user_id)
def set_entry(self, bucket: str, user_id: int | None, value: dict):
if user_id is None:
return
self.entries.setdefault(bucket, {})[user_id] = value
def pop_entry(self, bucket: str, user_id: int | None):
if user_id is None:
return None
return self.entries.get(bucket, {}).pop(user_id, None)
def get_user_context(self, user_id: int | None):
if user_id is None:
return None
return self.contexts.get(user_id)
def save_user_context(self, user_id: int | None, context: dict):
if user_id is None:
return
self.contexts[user_id] = context
class FakeToolExecutor:
def __init__(self, result=None):
self.result = result or {"ok": True}
self.calls = []
async def execute(self, tool_name: str, arguments: dict, user_id: int | None = None):
self.calls.append((tool_name, arguments, user_id))
if tool_name == "consultar_estoque" and arguments.get("preco_max") and float(arguments["preco_max"]) > 50000:
return [
{"id": 7, "modelo": "Hyundai HB20 2022", "categoria": "hatch", "preco": 54500.0},
{"id": 8, "modelo": "Chevrolet Onix 2023", "categoria": "hatch", "preco": 58900.0},
]
return self.result
def coerce_http_error(self, exc):
detail = exc.detail if isinstance(exc.detail, dict) else {}
return {
"code": detail.get("code", "tool_error"),
"message": detail.get("message", str(exc)),
"retryable": bool(detail.get("retryable", False)),
"field": detail.get("field"),
}
class FakePolicyService:
def __init__(self, state):
self.state = state
self.normalizer = EntityNormalizer()
def _get_user_context(self, user_id: int | None):
if user_id is None:
return None
return self.state.contexts.get(user_id)
def _save_user_context(self, user_id: int | None, context: dict | None) -> None:
if user_id is None or not isinstance(context, dict):
return
self.state.save_user_context(user_id, context)
def _new_tab_memory(self, user_id: int | None):
return {}
def _is_affirmative_message(self, text: str) -> bool:
normalized = self.normalizer.normalize_text(text).strip().rstrip(".!?,;:")
return normalized in {"sim", "pode", "ok", "confirmo", "aceito", "fechado", "pode sim"}
def _is_negative_message(self, text: str) -> bool:
normalized = self.normalizer.normalize_text(text).strip().rstrip(".!?,;:")
return normalized in {"nao", "nao quero"} or normalized.startswith("nao")
def _clear_user_conversation_state(self, user_id: int | None) -> None:
context = self._get_user_context(user_id)
if context:
context["pending_order_selection"] = None
async def handle_message(self, message: str, user_id: int | None = None) -> str:
return f"handled:{message}"
def _render_missing_review_fields_prompt(self, missing_fields: list[str]) -> str:
return "missing review"
def _render_missing_review_reschedule_fields_prompt(self, missing_fields: list[str]) -> str:
return "missing review reschedule"
def _render_missing_review_cancel_fields_prompt(self, missing_fields: list[str]) -> str:
return "missing review cancel"
def _render_review_reuse_question(self) -> str:
return "reuse review?"
def _render_missing_order_fields_prompt(self, missing_fields: list[str]) -> str:
return "missing order"
def _render_missing_cancel_order_fields_prompt(self, missing_fields: list[str]) -> str:
return "missing cancel order"
class TurnDecisionContractTests(unittest.IsolatedAsyncioTestCase):
async def test_extract_turn_decision_retries_once_and_returns_structured_payload(self):
llm = FakeLLM(
[
{"response": "nao eh json", "tool_call": None},
{
"response": """
{
"intent": "review_schedule",
"domain": "review",
"action": "ask_missing_fields",
"entities": {
"generic_memory": {},
"review_fields": {"placa": "abc1234", "data_hora": "10/03/2026 às 09:00"},
"review_management_fields": {},
"order_fields": {},
"cancel_order_fields": {}
},
"missing_fields": ["modelo", "ano", "km"],
"tool_name": null,
"tool_arguments": {},
"response_to_user": "Preciso do modelo, ano e quilometragem."
}
""",
"tool_call": None,
},
]
)
planner = MessagePlanner(llm=llm, normalizer=EntityNormalizer())
decision = await planner.extract_turn_decision("Quero agendar revisão amanhã às 09:00", user_id=7)
self.assertEqual(llm.calls, 2)
self.assertEqual(decision["intent"], "review_schedule")
self.assertEqual(decision["domain"], "review")
self.assertEqual(decision["action"], "ask_missing_fields")
self.assertEqual(decision["entities"]["review_fields"]["placa"], "ABC1234")
self.assertEqual(decision["entities"]["review_fields"]["data_hora"], "10/03/2026 às 09:00")
self.assertEqual(decision["missing_fields"], ["modelo", "ano", "km"])
def test_coerce_turn_decision_rejects_invalid_shape_with_fallback(self):
normalizer = EntityNormalizer()
decision = normalizer.coerce_turn_decision(
{
"intent": "valor_invalido",
"domain": "sales",
"action": "call_tool",
"entities": [],
}
)
self.assertEqual(decision["intent"], "general")
self.assertEqual(decision["domain"], "general")
self.assertEqual(decision["action"], "answer_user")
self.assertEqual(decision["entities"]["order_fields"], {})
def test_coerce_turn_decision_maps_order_aliases_from_model(self):
normalizer = EntityNormalizer()
decision = normalizer.coerce_turn_decision(
{
"intent": "place_order",
"domain": "sales",
"action": "answer_user",
"entities": {
"generic_memory": {"orcamento_max": "70000", "cpf": "12345678909"},
"review_fields": {},
"review_management_fields": {},
"order_fields": {},
"cancel_order_fields": {},
},
"missing_fields": [],
"selection_index": None,
"tool_name": None,
"tool_arguments": {},
"response_to_user": None,
}
)
self.assertEqual(decision["intent"], "order_create")
self.assertEqual(decision["domain"], "sales")
self.assertEqual(decision["action"], "answer_user")
self.assertEqual(decision["entities"]["generic_memory"]["orcamento_max"], 70000)
self.assertEqual(decision["entities"]["generic_memory"]["cpf"], "12345678909")
def test_coerce_turn_decision_converts_vehicle_alias_missing_field_into_order_collection(self):
normalizer = EntityNormalizer()
decision = normalizer.coerce_turn_decision(
{
"intent": "create_order",
"domain": "sales",
"action": "ask_missing_fields",
"entities": {
"generic_memory": {"orcamento_max": 70000},
"review_fields": {},
"review_management_fields": {},
"order_fields": {"cpf": "12345678909"},
"cancel_order_fields": {},
},
"missing_fields": ["modelo_carro"],
"selection_index": None,
"tool_name": None,
"tool_arguments": {},
"response_to_user": "Certo! Para qual modelo de carro voce gostaria de um orcamento de 70 mil?",
}
)
self.assertEqual(decision["intent"], "order_create")
self.assertEqual(decision["action"], "collect_order_create")
self.assertEqual(decision["missing_fields"], [])
self.assertIsNone(decision["response_to_user"])
def test_coerce_turn_decision_normalizes_cancel_order_tool_argument_aliases(self):
normalizer = EntityNormalizer()
decision = normalizer.coerce_turn_decision(
{
"intent": "order_cancel",
"domain": "sales",
"action": "call_tool",
"tool_name": "cancelar_pedido",
"tool_arguments": {
"order_id": "PED-20260310113756-DC1540",
"reason": "desisti da compra",
},
"entities": {
"generic_memory": {},
"review_fields": {},
"review_management_fields": {},
"order_fields": {},
"cancel_order_fields": {},
},
"missing_fields": [],
"response_to_user": None,
}
)
self.assertEqual(decision["tool_arguments"]["numero_pedido"], "PED-20260310113756-DC1540")
self.assertEqual(decision["tool_arguments"]["motivo"], "desisti da compra")
def test_coerce_turn_decision_normalizes_review_tool_name_alias(self):
normalizer = EntityNormalizer()
decision = normalizer.coerce_turn_decision(
{
"intent": "review_schedule",
"domain": "review",
"action": "call_tool",
"tool_name": "marcar_revisao",
"tool_arguments": {
"placa": "ABC1234",
"data_hora": "19/03/2026 09:00",
"modelo": "Corolla",
"ano": 2020,
"km": 30000,
"revisao_previa_concessionaria": True,
},
"entities": {
"generic_memory": {},
"review_fields": {},
"review_management_fields": {},
"order_fields": {},
"cancel_order_fields": {},
},
"missing_fields": [],
"response_to_user": None,
}
)
self.assertEqual(decision["tool_name"], "agendar_revisao")
self.assertEqual(decision["tool_arguments"]["placa"], "ABC1234")
def test_coerce_turn_decision_normalizes_review_schedule_tool_argument_aliases(self):
normalizer = EntityNormalizer()
decision = normalizer.coerce_turn_decision(
{
"intent": "review_schedule",
"domain": "review",
"action": "call_tool",
"tool_name": "agendar_revisao",
"tool_arguments": {
"placa_veiculo": "ABC1234",
"data": "20/03/2026 09:00",
"modelo_veiculo": "Corolla",
"ano_veiculo": 2020,
"quilometragem": 30000,
"revisao_previa": True,
},
"entities": {
"generic_memory": {},
"review_fields": {},
"review_management_fields": {},
"order_fields": {},
"cancel_order_fields": {},
},
"missing_fields": [],
"response_to_user": None,
}
)
self.assertEqual(decision["tool_arguments"]["placa"], "ABC1234")
self.assertEqual(decision["tool_arguments"]["data_hora"], "20/03/2026 09:00")
self.assertEqual(decision["tool_arguments"]["modelo"], "Corolla")
self.assertEqual(decision["tool_arguments"]["ano"], 2020)
self.assertEqual(decision["tool_arguments"]["km"], 30000)
self.assertTrue(decision["tool_arguments"]["revisao_previa_concessionaria"])
def test_coerce_turn_decision_downgrades_incomplete_review_schedule_tool_call_to_collection(self):
normalizer = EntityNormalizer()
decision = normalizer.coerce_turn_decision(
{
"intent": "review_schedule",
"domain": "review",
"action": "call_tool",
"tool_name": "agendar_revisao",
"tool_arguments": {
"placa_veiculo": "ABC1234",
"modelo_veiculo": "Corolla",
"ano_veiculo": 2020,
},
"entities": {
"generic_memory": {},
"review_fields": {},
"review_management_fields": {},
"order_fields": {},
"cancel_order_fields": {},
},
"missing_fields": [],
"response_to_user": None,
}
)
self.assertEqual(decision["action"], "collect_review_schedule")
self.assertIsNone(decision["tool_name"])
self.assertEqual(decision["tool_arguments"], {})
self.assertEqual(decision["entities"]["review_fields"]["placa"], "ABC1234")
self.assertEqual(decision["entities"]["review_fields"]["modelo"], "Corolla")
self.assertEqual(decision["entities"]["review_fields"]["ano"], 2020)
def test_coerce_turn_decision_downgrades_incomplete_review_schedule_tool_call_even_with_general_domain(self):
normalizer = EntityNormalizer()
decision = normalizer.coerce_turn_decision(
{
"intent": "review_schedule",
"domain": "general",
"action": "call_tool",
"tool_name": "agendar_revisao",
"tool_arguments": {
"placa_veiculo": "ABC1234",
"modelo_veiculo": "Corolla",
},
"entities": {
"generic_memory": {},
"review_fields": {},
"review_management_fields": {},
"order_fields": {},
"cancel_order_fields": {},
},
"missing_fields": [],
"response_to_user": None,
}
)
self.assertEqual(decision["action"], "collect_review_schedule")
self.assertIsNone(decision["tool_name"])
self.assertEqual(decision["tool_arguments"], {})
self.assertEqual(decision["entities"]["review_fields"]["placa"], "ABC1234")
self.assertEqual(decision["entities"]["review_fields"]["modelo"], "Corolla")
def test_coerce_turn_decision_normalizes_review_listing_tool_arguments(self):
normalizer = EntityNormalizer()
decision = normalizer.coerce_turn_decision(
{
"intent": "review_list",
"domain": "review",
"action": "call_tool",
"tool_name": "listar_agendamentos",
"tool_arguments": {
"vehicle_plate": "abc1234",
"schedule_status": "agendado",
"limit": 10,
"tipo": "revisao",
},
"entities": {
"generic_memory": {},
"review_fields": {},
"review_management_fields": {},
"order_fields": {},
"cancel_order_fields": {},
},
"missing_fields": [],
"response_to_user": None,
}
)
self.assertEqual(decision["tool_name"], "listar_agendamentos_revisao")
self.assertEqual(
decision["tool_arguments"],
{"placa": "ABC1234", "status": "agendado", "limite": 10},
)
def test_coerce_turn_decision_normalizes_review_management_tool_name_alias(self):
normalizer = EntityNormalizer()
decision = normalizer.coerce_turn_decision(
{
"intent": "review_cancel",
"domain": "review",
"action": "call_tool",
"tool_name": "cancelar_agendamento",
"tool_arguments": {
"protocolo": "REV-20260313-F754AF27",
},
"entities": {
"generic_memory": {},
"review_fields": {},
"review_management_fields": {},
"order_fields": {},
"cancel_order_fields": {},
},
"missing_fields": [],
"response_to_user": None,
}
)
self.assertEqual(decision["tool_name"], "cancelar_agendamento_revisao")
def test_coerce_turn_decision_downgrades_incomplete_cancel_order_tool_call_to_collection(self):
normalizer = EntityNormalizer()
decision = normalizer.coerce_turn_decision(
{
"intent": "order_cancel",
"domain": "sales",
"action": "call_tool",
"tool_name": "cancelar_pedido",
"tool_arguments": {
"order_id": "PED-20260310124202-5EF4E9",
},
"entities": {
"generic_memory": {},
"review_fields": {},
"review_management_fields": {},
"order_fields": {},
"cancel_order_fields": {},
},
"missing_fields": [],
"response_to_user": None,
}
)
self.assertEqual(decision["action"], "collect_order_cancel")
self.assertIsNone(decision["tool_name"])
self.assertEqual(decision["tool_arguments"], {})
self.assertEqual(decision["entities"]["cancel_order_fields"]["numero_pedido"], "PED-20260310124202-5EF4E9")
def test_coerce_turn_decision_rejects_missing_fields_without_response_payload(self):
normalizer = EntityNormalizer()
decision = normalizer.coerce_turn_decision(
{
"intent": "review_schedule",
"domain": "review",
"action": "ask_missing_fields",
"entities": {
"generic_memory": {},
"review_fields": {},
"review_management_fields": {},
"order_fields": {},
"cancel_order_fields": {},
},
"missing_fields": [],
"tool_name": None,
"tool_arguments": {},
"response_to_user": "",
}
)
self.assertEqual(decision["intent"], "general")
self.assertEqual(decision["action"], "answer_user")
def test_turn_decision_entities_do_not_rebuild_legacy_intents(self):
service = OrquestradorService.__new__(OrquestradorService)
service.normalizer = EntityNormalizer()
extracted = service._extracted_entities_from_turn_decision(
{
"intent": "order_create",
"domain": "sales",
"action": "collect_order_create",
"entities": {
"generic_memory": {"cpf": "12345678909"},
"review_fields": {},
"review_management_fields": {},
"order_fields": {"vehicle_id": 1},
"cancel_order_fields": {},
},
}
)
self.assertEqual(extracted["intents"], {})
self.assertEqual(extracted["order_fields"]["vehicle_id"], 1)
def test_turn_decision_entity_merge_preserves_generic_memory_from_previous_extraction(self):
service = OrquestradorService.__new__(OrquestradorService)
service.normalizer = EntityNormalizer()
service._empty_extraction_payload = service.normalizer.empty_extraction_payload
merged = service._merge_extracted_entities(
{
"generic_memory": {"orcamento_max": 70000},
"review_fields": {},
"review_management_fields": {},
"order_fields": {"cpf": "12345678909"},
"cancel_order_fields": {},
"intents": {},
},
{
"generic_memory": {},
"review_fields": {},
"review_management_fields": {},
"order_fields": {},
"cancel_order_fields": {},
"intents": {},
},
)
self.assertEqual(merged["generic_memory"]["orcamento_max"], 70000)
self.assertEqual(merged["order_fields"]["cpf"], "12345678909")
def test_entity_merge_can_enrich_message_plan_with_full_extraction(self):
service = OrquestradorService.__new__(OrquestradorService)
service.normalizer = EntityNormalizer()
service._empty_extraction_payload = service.normalizer.empty_extraction_payload
merged = service._merge_extracted_entities(
{
"generic_memory": {},
"review_fields": {},
"review_management_fields": {},
"order_fields": {"cpf": "12345678909"},
"cancel_order_fields": {},
"intents": {},
},
{
"generic_memory": {"orcamento_max": 70000},
"review_fields": {},
"review_management_fields": {},
"order_fields": {},
"cancel_order_fields": {},
"intents": {},
},
)
self.assertEqual(merged["generic_memory"]["orcamento_max"], 70000)
self.assertEqual(merged["order_fields"]["cpf"], "12345678909")
async def test_missing_sales_search_context_triggers_focused_llm_enrichment(self):
service = OrquestradorService.__new__(OrquestradorService)
service.normalizer = EntityNormalizer()
async def fake_extract_sales_search_context_with_llm(message: str, user_id: int | None):
return {"orcamento_max": 70000}
service._extract_sales_search_context_with_llm = fake_extract_sales_search_context_with_llm
result = await service._extract_missing_sales_search_context_with_llm(
message="Quero comprar um carro de 70 mil, meu CPF e 12345678909",
user_id=7,
turn_decision={"domain": "sales", "intent": "order_create", "action": "collect_order_create"},
extracted_entities={
"generic_memory": {},
"review_fields": {},
"review_management_fields": {},
"order_fields": {"cpf": "12345678909"},
"cancel_order_fields": {},
"intents": {},
},
)
self.assertEqual(result["orcamento_max"], 70000)
async def test_turn_decision_call_tool_executes_without_router(self):
service = OrquestradorService.__new__(OrquestradorService)
service.tool_executor = FakeToolExecutor(result={"numero_pedido": "PED-1", "status": "Ativo"})
service.llm = FakeLLM([])
service._capture_review_confirmation_suggestion = lambda **kwargs: None
service._capture_tool_result_context = lambda **kwargs: None
service._should_use_deterministic_response = lambda tool_name: True
service._fallback_format_tool_result = lambda tool_name, tool_result: f"{tool_name}:{tool_result['numero_pedido']}"
async def fake_render_tool_response_with_fallback(**kwargs):
return f"{kwargs['tool_name']}:{kwargs['tool_result']['numero_pedido']}"
service._render_tool_response_with_fallback = fake_render_tool_response_with_fallback
service._http_exception_detail = lambda exc: str(exc)
service._is_low_value_response = lambda text: False
async def finish(response: str, queue_notice: str | None = None) -> str:
return response if not queue_notice else f"{queue_notice}\n{response}"
response = await service._try_execute_business_tool_from_turn_decision(
message="quero fechar o pedido",
user_id=7,
turn_decision={
"action": "call_tool",
"tool_name": "realizar_pedido",
"tool_arguments": {"cpf": "12345678909", "vehicle_id": 1},
},
queue_notice=None,
finish=finish,
)
self.assertEqual(
service.tool_executor.calls,
[("realizar_pedido", {"cpf": "12345678909", "vehicle_id": 1}, 7)],
)
self.assertEqual(response, "realizar_pedido:PED-1")
self.assertEqual(service.llm.calls, 0)
async def test_turn_decision_cancel_order_uses_deterministic_response_without_result_llm(self):
service = OrquestradorService.__new__(OrquestradorService)
service.tool_executor = FakeToolExecutor(result={"numero_pedido": "PED-1", "status": "Cancelado", "motivo": "desisti"})
service.llm = FakeLLM([])
service._capture_review_confirmation_suggestion = lambda **kwargs: None
service._capture_tool_result_context = lambda **kwargs: None
service._should_use_deterministic_response = lambda tool_name: tool_name == "cancelar_pedido"
service._fallback_format_tool_result = lambda tool_name, tool_result: (
f"Pedido {tool_result['numero_pedido']} atualizado.\nStatus: {tool_result['status']}"
)
async def fake_render_tool_response_with_fallback(**kwargs):
return "nao deveria usar llm"
service._render_tool_response_with_fallback = fake_render_tool_response_with_fallback
service._http_exception_detail = lambda exc: str(exc)
service._is_low_value_response = lambda text: False
async def finish(response: str, queue_notice: str | None = None) -> str:
return response if not queue_notice else f"{queue_notice}\n{response}"
response = await service._try_execute_business_tool_from_turn_decision(
message="cancelar pedido",
user_id=7,
turn_decision={
"action": "call_tool",
"tool_name": "cancelar_pedido",
"tool_arguments": {"numero_pedido": "PED-1", "motivo": "desisti"},
},
queue_notice=None,
finish=finish,
)
self.assertEqual(response, "Pedido PED-1 atualizado.\nStatus: Cancelado")
self.assertEqual(service.llm.calls, 0)
async def test_empty_stock_search_suggests_nearby_options(self):
service = OrquestradorService.__new__(OrquestradorService)
service.normalizer = EntityNormalizer()
service.tool_executor = FakeToolExecutor(result=[])
service._get_user_context = lambda user_id: {
"generic_memory": {},
"shared_memory": {},
"last_stock_results": [],
"selected_vehicle": None,
}
service._capture_tool_result_context = lambda tool_name, tool_result, user_id: None
service._normalize_positive_number = service.normalizer.normalize_positive_number
response = await service._maybe_build_stock_suggestion_response(
tool_name="consultar_estoque",
arguments={"preco_max": 50000, "limite": 5},
tool_result=[],
user_id=5,
)
self.assertIn("Nao encontrei veiculos ate R$ 50.000.", response)
self.assertIn("Hyundai HB20 2022", response)
self.assertIn("Se quiser, responda com o numero da lista ou com o modelo.", response)
async def test_turn_decision_answer_user_can_short_circuit_router(self):
decision = {
"intent": "general",
"domain": "general",
"action": "answer_user",
"response_to_user": "Resposta direta do contrato.",
}
self.assertEqual(str(decision.get("action") or ""), "answer_user")
self.assertEqual(str(decision.get("response_to_user") or "").strip(), "Resposta direta do contrato.")
async def test_handle_message_prioritizes_order_flow_over_model_answer_for_purchase_intent(self):
state = FakeState(
contexts={
1: {
"active_domain": "general",
"generic_memory": {},
"shared_memory": {},
"order_queue": [],
"pending_order_selection": None,
"pending_switch": None,
"last_stock_results": [],
"selected_vehicle": None,
}
}
)
service = OrquestradorService.__new__(OrquestradorService)
service.state = state
service.normalizer = EntityNormalizer()
service._empty_extraction_payload = service.normalizer.empty_extraction_payload
service._log_turn_event = lambda *args, **kwargs: None
service._compose_order_aware_response = lambda response, user_id, queue_notice=None: response
async def fake_maybe_auto_advance_next_order(base_response: str, user_id: int | None):
return base_response
service._maybe_auto_advance_next_order = fake_maybe_auto_advance_next_order
service._upsert_user_context = lambda user_id: None
async def fake_extract_turn_decision(message: str, user_id: int | None):
return {
"intent": "order_create",
"domain": "sales",
"action": "answer_user",
"entities": {
"generic_memory": {"cpf": "12345678909", "orcamento_max": 70000},
"review_fields": {},
"review_management_fields": {},
"order_fields": {},
"cancel_order_fields": {},
},
"missing_fields": [],
"selection_index": None,
"tool_name": None,
"tool_arguments": {},
"response_to_user": "Certo! Para te ajudar a encontrar o carro ideal dentro do seu orcamento.",
}
service._extract_turn_decision_with_llm = fake_extract_turn_decision
async def fake_try_resolve_pending_order_selection(**kwargs):
return None
service._try_resolve_pending_order_selection = fake_try_resolve_pending_order_selection
async def fake_try_continue_queued_order(**kwargs):
return None
service._try_continue_queued_order = fake_try_continue_queued_order
async def fake_extract_message_plan(message: str, user_id: int | None):
return {
"orders": [
{
"domain": "sales",
"message": message,
"entities": service.normalizer.empty_extraction_payload(),
}
]
}
service._extract_message_plan_with_llm = fake_extract_message_plan
service._prepare_message_for_single_order = lambda message, user_id, routing_plan=None: (message, None, None)
service._resolve_entities_for_message_plan = lambda message_plan, routed_message: service.normalizer.empty_extraction_payload()
async def fake_extract_entities(message: str, user_id: int | None):
return {
"generic_memory": {"cpf": "12345678909", "orcamento_max": 70000},
"review_fields": {},
"review_management_fields": {},
"order_fields": {},
"cancel_order_fields": {},
"intents": {},
}
service._extract_entities_with_llm = fake_extract_entities
async def fake_extract_missing_sales_search_context_with_llm(**kwargs):
return {}
service._extract_missing_sales_search_context_with_llm = fake_extract_missing_sales_search_context_with_llm
service._domain_from_intents = lambda intents: "general"
service._handle_context_switch = lambda **kwargs: None
service._update_active_domain = lambda **kwargs: None
async def fake_try_execute_orchestration_control_tool(**kwargs):
return None
service._try_execute_orchestration_control_tool = fake_try_execute_orchestration_control_tool
async def fake_try_execute_business_tool_from_turn_decision(**kwargs):
return None
service._try_execute_business_tool_from_turn_decision = fake_try_execute_business_tool_from_turn_decision
async def fake_try_handle_review_management(**kwargs):
return None
service._try_handle_review_management = fake_try_handle_review_management
async def fake_try_confirm_pending_review(**kwargs):
return None
service._try_confirm_pending_review = fake_try_confirm_pending_review
async def fake_try_collect_and_schedule_review(**kwargs):
return None
service._try_collect_and_schedule_review = fake_try_collect_and_schedule_review
async def fake_try_collect_and_cancel_order(**kwargs):
return None
service._try_collect_and_cancel_order = fake_try_collect_and_cancel_order
async def fake_try_collect_and_create_order(**kwargs):
return "Encontrei 2 veiculo(s):\n1. Hyundai HB20 2022"
service._try_collect_and_create_order = fake_try_collect_and_create_order
response = await service.handle_message(
"Quero comprar um carro de 70 mil, meu CPF e 12345678909",
user_id=1,
)
self.assertIn("Encontrei 2 veiculo(s):", response)
def test_should_prioritize_review_flow_when_review_draft_is_open(self):
state = FakeState(
entries={
"pending_review_drafts": {
1: {
"payload": {"placa": "ABC1269"},
"expires_at": datetime.utcnow() + timedelta(minutes=15),
}
}
}
)
service = OrquestradorService.__new__(OrquestradorService)
service.state = state
service.normalizer = EntityNormalizer()
prioritized = service._should_prioritize_review_flow(
turn_decision={"intent": "general", "domain": "general", "action": "answer_user"},
extracted_entities={
"generic_memory": {},
"review_fields": {},
"review_management_fields": {},
"order_fields": {},
"cancel_order_fields": {},
"intents": {},
},
user_id=1,
)
self.assertTrue(prioritized)
def test_should_prioritize_review_flow_when_active_domain_is_review(self):
state = FakeState(
contexts={
1: {
"active_domain": "review",
"generic_memory": {},
"shared_memory": {},
"order_queue": [],
"pending_order_selection": None,
"pending_switch": None,
"last_stock_results": [],
"selected_vehicle": None,
"expires_at": datetime.utcnow() + timedelta(minutes=15),
}
}
)
service = OrquestradorService.__new__(OrquestradorService)
service.state = state
service.normalizer = EntityNormalizer()
prioritized = service._should_prioritize_review_flow(
turn_decision={"intent": "general", "domain": "general", "action": "answer_user"},
extracted_entities={
"generic_memory": {},
"review_fields": {},
"review_management_fields": {},
"order_fields": {},
"cancel_order_fields": {},
"intents": {},
},
user_id=1,
)
self.assertTrue(prioritized)
async def test_handle_message_prioritizes_review_flow_over_model_answer_for_followup(self):
state = FakeState(
entries={
"pending_review_drafts": {
1: {
"payload": {"placa": "ABC1269"},
"expires_at": datetime.utcnow() + timedelta(minutes=15),
}
}
},
contexts={
1: {
"active_domain": "review",
"generic_memory": {"placa": "ABC1269"},
"shared_memory": {"placa": "ABC1269"},
"order_queue": [],
"pending_order_selection": None,
"pending_switch": None,
"last_stock_results": [],
"selected_vehicle": None,
}
},
)
service = OrquestradorService.__new__(OrquestradorService)
service.state = state
service.normalizer = EntityNormalizer()
service._empty_extraction_payload = service.normalizer.empty_extraction_payload
service._log_turn_event = lambda *args, **kwargs: None
service._compose_order_aware_response = lambda response, user_id, queue_notice=None: response
async def fake_maybe_auto_advance_next_order(base_response: str, user_id: int | None):
return base_response
service._maybe_auto_advance_next_order = fake_maybe_auto_advance_next_order
service._upsert_user_context = lambda user_id: None
async def fake_extract_turn_decision(message: str, user_id: int | None):
return {
"intent": "general",
"domain": "general",
"action": "answer_user",
"entities": {
"generic_memory": {},
"review_fields": {},
"review_management_fields": {},
"order_fields": {},
"cancel_order_fields": {},
},
"missing_fields": [],
"selection_index": None,
"tool_name": None,
"tool_arguments": {},
"response_to_user": "Para que tipo de compromisso voce gostaria de marcar amanha as 16 horas?",
}
service._extract_turn_decision_with_llm = fake_extract_turn_decision
async def fake_try_handle_immediate_context_reset(**kwargs):
return None
service._try_handle_immediate_context_reset = fake_try_handle_immediate_context_reset
async def fake_try_resolve_pending_order_selection(**kwargs):
return None
service._try_resolve_pending_order_selection = fake_try_resolve_pending_order_selection
async def fake_try_continue_queued_order(**kwargs):
return None
service._try_continue_queued_order = fake_try_continue_queued_order
async def fake_extract_message_plan(message: str, user_id: int | None):
return {
"orders": [
{
"domain": "review",
"message": message,
"entities": service.normalizer.empty_extraction_payload(),
}
]
}
service._extract_message_plan_with_llm = fake_extract_message_plan
service._prepare_message_for_single_order = lambda message, user_id, routing_plan=None: (message, None, None)
service._resolve_entities_for_message_plan = lambda message_plan, routed_message: service.normalizer.empty_extraction_payload()
async def fake_extract_entities(message: str, user_id: int | None):
return {
"generic_memory": {},
"review_fields": {"data_hora": "13/03/2026 16:00"},
"review_management_fields": {},
"order_fields": {},
"cancel_order_fields": {},
"intents": {},
}
service._extract_entities_with_llm = fake_extract_entities
async def fake_extract_missing_sales_search_context_with_llm(**kwargs):
return {}
service._extract_missing_sales_search_context_with_llm = fake_extract_missing_sales_search_context_with_llm
service._domain_from_intents = lambda intents: "general"
service._handle_context_switch = lambda **kwargs: None
service._update_active_domain = lambda **kwargs: None
async def fake_try_execute_orchestration_control_tool(**kwargs):
return None
service._try_execute_orchestration_control_tool = fake_try_execute_orchestration_control_tool
async def fake_try_execute_business_tool_from_turn_decision(**kwargs):
return None
service._try_execute_business_tool_from_turn_decision = fake_try_execute_business_tool_from_turn_decision
async def fake_try_handle_review_management(**kwargs):
return None
service._try_handle_review_management = fake_try_handle_review_management
async def fake_try_confirm_pending_review(**kwargs):
return None
service._try_confirm_pending_review = fake_try_confirm_pending_review
async def fake_try_collect_and_schedule_review(**kwargs):
return "Para agendar sua revisao, preciso dos dados abaixo:\n- o modelo do veiculo"
service._try_collect_and_schedule_review = fake_try_collect_and_schedule_review
async def fake_try_collect_and_cancel_order(**kwargs):
return None
service._try_collect_and_cancel_order = fake_try_collect_and_cancel_order
async def fake_try_handle_order_listing(**kwargs):
return None
service._try_handle_order_listing = fake_try_handle_order_listing
async def fake_try_collect_and_create_order(**kwargs):
return None
service._try_collect_and_create_order = fake_try_collect_and_create_order
response = await service.handle_message(
"Eu gostaria de marcar amanha as 16 horas",
user_id=1,
)
self.assertIn("o modelo do veiculo", response)
def test_should_prioritize_order_flow_when_cancel_draft_is_open(self):
state = FakeState(
entries={
"pending_cancel_order_drafts": {
1: {
"payload": {"numero_pedido": "PED-202603101204814-6ED33A"},
"expires_at": datetime.utcnow() + timedelta(minutes=15),
}
}
}
)
service = OrquestradorService.__new__(OrquestradorService)
service.state = state
service.normalizer = EntityNormalizer()
prioritized = service._should_prioritize_order_flow(
turn_decision={"intent": "general", "domain": "general", "action": "answer_user"},
extracted_entities={
"generic_memory": {},
"review_fields": {},
"review_management_fields": {},
"order_fields": {},
"cancel_order_fields": {},
"intents": {},
},
user_id=1,
)
self.assertTrue(prioritized)
async def test_pending_order_selection_prefers_turn_decision_domain(self):
state = FakeState(
contexts={
9: {
"pending_order_selection": {
"orders": [
{"domain": "review", "message": "agendar revisao", "memory_seed": {}},
{"domain": "sales", "message": "fazer pedido", "memory_seed": {}},
],
"expires_at": datetime.utcnow() + timedelta(minutes=15),
},
"order_queue": [],
"active_domain": "general",
"generic_memory": {},
}
}
)
policy = ConversationPolicy(service=FakePolicyService(state))
response = await policy.try_resolve_pending_order_selection(
message="quero comprar",
user_id=9,
turn_decision={"domain": "sales", "intent": "order_create", "action": "collect_order_create"},
)
self.assertIn("Vou comecar por: Venda: fazer pedido", response)
async def test_pending_order_selection_prefers_turn_decision_selection_index(self):
state = FakeState(
contexts={
9: {
"pending_order_selection": {
"orders": [
{"domain": "review", "message": "agendar revisao", "memory_seed": {}},
{"domain": "sales", "message": "fazer pedido", "memory_seed": {}},
],
"expires_at": datetime.utcnow() + timedelta(minutes=15),
},
"order_queue": [],
"active_domain": "general",
"generic_memory": {},
}
}
)
policy = ConversationPolicy(service=FakePolicyService(state))
response = await policy.try_resolve_pending_order_selection(
message="esse",
user_id=9,
turn_decision={"domain": "general", "intent": "general", "action": "answer_user", "selection_index": 1},
)
self.assertIn("Vou comecar por: Venda: fazer pedido", response)
async def test_try_continue_queue_prefers_turn_decision_action(self):
state = FakeState(
contexts={
9: {
"pending_switch": {
"target_domain": "sales",
"queued_message": "fazer pedido",
"memory_seed": {"cpf": "12345678909"},
"expires_at": datetime.utcnow() + timedelta(minutes=15),
},
"active_domain": "general",
"generic_memory": {},
"pending_order_selection": None,
}
}
)
service = FakePolicyService(state)
policy = ConversationPolicy(service=service)
policy.apply_domain_switch = lambda user_id, target_domain: service._get_user_context(user_id).update(
{"active_domain": target_domain, "pending_switch": None}
)
response = await policy.try_continue_queued_order(
message="ok",
user_id=9,
turn_decision={"action": "continue_queue", "intent": "queue_continue", "domain": "sales"},
)
self.assertIn("Agora, sobre a compra do veiculo:", response)
def test_handle_context_switch_prefers_turn_decision_domain_confirmation(self):
state = FakeState(
contexts={
9: {
"pending_switch": {
"target_domain": "review",
"expires_at": datetime.utcnow() + timedelta(minutes=15),
},
"active_domain": "sales",
"generic_memory": {},
"pending_order_selection": None,
}
}
)
service = FakePolicyService(state)
policy = ConversationPolicy(service=service)
policy.apply_domain_switch = lambda user_id, target_domain: service._get_user_context(user_id).update(
{"active_domain": target_domain, "pending_switch": None}
)
response = policy.handle_context_switch(
message="quero revisar",
user_id=9,
target_domain_hint="review",
turn_decision={"domain": "review", "intent": "review_schedule", "action": "collect_review_schedule"},
)
self.assertEqual(response, "Certo, contexto anterior encerrado. Vamos seguir com agendamento de revisao.")
if __name__ == "__main__":
unittest.main()