import os import unittest os.environ.setdefault("DEBUG", "false") from datetime import datetime, timedelta from app.services.orchestration.conversation_policy import ConversationPolicy from app.services.orchestration.entity_normalizer import EntityNormalizer from app.services.orchestration.message_planner import MessagePlanner from app.services.orchestration.orquestrador_service import OrquestradorService class FakeLLM: def __init__(self, responses): self.responses = list(responses) self.calls = 0 async def generate_response(self, message: str, tools): self.calls += 1 if self.responses: return self.responses.pop(0) return {"response": "", "tool_call": None} class FakeState: def __init__(self, entries=None, contexts=None): self.entries = entries or {} self.contexts = contexts or {} def get_entry(self, bucket: str, user_id: int | None, *, expire: bool = False): if user_id is None: return None return self.entries.get(bucket, {}).get(user_id) def set_entry(self, bucket: str, user_id: int | None, value: dict): if user_id is None: return self.entries.setdefault(bucket, {})[user_id] = value def pop_entry(self, bucket: str, user_id: int | None): if user_id is None: return None return self.entries.get(bucket, {}).pop(user_id, None) class FakeToolExecutor: def __init__(self, result=None): self.result = result or {"ok": True} self.calls = [] async def execute(self, tool_name: str, arguments: dict, user_id: int | None = None): self.calls.append((tool_name, arguments, user_id)) if tool_name == "consultar_estoque" and arguments.get("preco_max") and float(arguments["preco_max"]) > 50000: return [ {"id": 7, "modelo": "Hyundai HB20 2022", "categoria": "hatch", "preco": 54500.0}, {"id": 8, "modelo": "Chevrolet Onix 2023", "categoria": "hatch", "preco": 58900.0}, ] return self.result def coerce_http_error(self, exc): detail = exc.detail if isinstance(exc.detail, dict) else {} return { "code": detail.get("code", "tool_error"), "message": detail.get("message", str(exc)), "retryable": bool(detail.get("retryable", False)), "field": detail.get("field"), } class FakePolicyService: def __init__(self, state): self.state = state self.normalizer = EntityNormalizer() def _get_user_context(self, user_id: int | None): if user_id is None: return None return self.state.contexts.get(user_id) def _new_tab_memory(self, user_id: int | None): return {} def _is_affirmative_message(self, text: str) -> bool: normalized = self.normalizer.normalize_text(text).strip().rstrip(".!?,;:") return normalized in {"sim", "pode", "ok", "confirmo", "aceito", "fechado", "pode sim"} def _is_negative_message(self, text: str) -> bool: normalized = self.normalizer.normalize_text(text).strip().rstrip(".!?,;:") return normalized in {"nao", "nao quero"} or normalized.startswith("nao") def _clear_user_conversation_state(self, user_id: int | None) -> None: context = self._get_user_context(user_id) if context: context["pending_order_selection"] = None async def handle_message(self, message: str, user_id: int | None = None) -> str: return f"handled:{message}" def _render_missing_review_fields_prompt(self, missing_fields: list[str]) -> str: return "missing review" def _render_missing_review_reschedule_fields_prompt(self, missing_fields: list[str]) -> str: return "missing review reschedule" def _render_missing_review_cancel_fields_prompt(self, missing_fields: list[str]) -> str: return "missing review cancel" def _render_review_reuse_question(self) -> str: return "reuse review?" def _render_missing_order_fields_prompt(self, missing_fields: list[str]) -> str: return "missing order" def _render_missing_cancel_order_fields_prompt(self, missing_fields: list[str]) -> str: return "missing cancel order" class TurnDecisionContractTests(unittest.IsolatedAsyncioTestCase): async def test_extract_turn_decision_retries_once_and_returns_structured_payload(self): llm = FakeLLM( [ {"response": "nao eh json", "tool_call": None}, { "response": """ { "intent": "review_schedule", "domain": "review", "action": "ask_missing_fields", "entities": { "generic_memory": {}, "review_fields": {"placa": "abc1234", "data_hora": "10/03/2026 às 09:00"}, "review_management_fields": {}, "order_fields": {}, "cancel_order_fields": {} }, "missing_fields": ["modelo", "ano", "km"], "tool_name": null, "tool_arguments": {}, "response_to_user": "Preciso do modelo, ano e quilometragem." } """, "tool_call": None, }, ] ) planner = MessagePlanner(llm=llm, normalizer=EntityNormalizer()) decision = await planner.extract_turn_decision("Quero agendar revisão amanhã às 09:00", user_id=7) self.assertEqual(llm.calls, 2) self.assertEqual(decision["intent"], "review_schedule") self.assertEqual(decision["domain"], "review") self.assertEqual(decision["action"], "ask_missing_fields") self.assertEqual(decision["entities"]["review_fields"]["placa"], "ABC1234") self.assertEqual(decision["entities"]["review_fields"]["data_hora"], "10/03/2026 às 09:00") self.assertEqual(decision["missing_fields"], ["modelo", "ano", "km"]) def test_coerce_turn_decision_rejects_invalid_shape_with_fallback(self): normalizer = EntityNormalizer() decision = normalizer.coerce_turn_decision( { "intent": "valor_invalido", "domain": "sales", "action": "call_tool", "entities": [], } ) self.assertEqual(decision["intent"], "general") self.assertEqual(decision["domain"], "general") self.assertEqual(decision["action"], "answer_user") self.assertEqual(decision["entities"]["order_fields"], {}) def test_coerce_turn_decision_rejects_missing_fields_without_response_payload(self): normalizer = EntityNormalizer() decision = normalizer.coerce_turn_decision( { "intent": "review_schedule", "domain": "review", "action": "ask_missing_fields", "entities": { "generic_memory": {}, "review_fields": {}, "review_management_fields": {}, "order_fields": {}, "cancel_order_fields": {}, }, "missing_fields": [], "tool_name": None, "tool_arguments": {}, "response_to_user": "", } ) self.assertEqual(decision["intent"], "general") self.assertEqual(decision["action"], "answer_user") def test_turn_decision_entities_do_not_rebuild_legacy_intents(self): service = OrquestradorService.__new__(OrquestradorService) service.normalizer = EntityNormalizer() extracted = service._extracted_entities_from_turn_decision( { "intent": "order_create", "domain": "sales", "action": "collect_order_create", "entities": { "generic_memory": {"cpf": "12345678909"}, "review_fields": {}, "review_management_fields": {}, "order_fields": {"vehicle_id": 1}, "cancel_order_fields": {}, }, } ) self.assertEqual(extracted["intents"], {}) self.assertEqual(extracted["order_fields"]["vehicle_id"], 1) def test_turn_decision_entity_merge_preserves_generic_memory_from_previous_extraction(self): service = OrquestradorService.__new__(OrquestradorService) service.normalizer = EntityNormalizer() service._empty_extraction_payload = service.normalizer.empty_extraction_payload merged = service._merge_extracted_entities( { "generic_memory": {"orcamento_max": 70000}, "review_fields": {}, "review_management_fields": {}, "order_fields": {"cpf": "12345678909"}, "cancel_order_fields": {}, "intents": {}, }, { "generic_memory": {}, "review_fields": {}, "review_management_fields": {}, "order_fields": {}, "cancel_order_fields": {}, "intents": {}, }, ) self.assertEqual(merged["generic_memory"]["orcamento_max"], 70000) self.assertEqual(merged["order_fields"]["cpf"], "12345678909") def test_entity_merge_can_enrich_message_plan_with_full_extraction(self): service = OrquestradorService.__new__(OrquestradorService) service.normalizer = EntityNormalizer() service._empty_extraction_payload = service.normalizer.empty_extraction_payload merged = service._merge_extracted_entities( { "generic_memory": {}, "review_fields": {}, "review_management_fields": {}, "order_fields": {"cpf": "12345678909"}, "cancel_order_fields": {}, "intents": {}, }, { "generic_memory": {"orcamento_max": 70000}, "review_fields": {}, "review_management_fields": {}, "order_fields": {}, "cancel_order_fields": {}, "intents": {}, }, ) self.assertEqual(merged["generic_memory"]["orcamento_max"], 70000) self.assertEqual(merged["order_fields"]["cpf"], "12345678909") async def test_missing_sales_search_context_triggers_focused_llm_enrichment(self): service = OrquestradorService.__new__(OrquestradorService) service.normalizer = EntityNormalizer() async def fake_extract_sales_search_context_with_llm(message: str, user_id: int | None): return {"orcamento_max": 70000} service._extract_sales_search_context_with_llm = fake_extract_sales_search_context_with_llm result = await service._extract_missing_sales_search_context_with_llm( message="Quero comprar um carro de 70 mil, meu CPF e 12345678909", user_id=7, turn_decision={"domain": "sales", "intent": "order_create", "action": "collect_order_create"}, extracted_entities={ "generic_memory": {}, "review_fields": {}, "review_management_fields": {}, "order_fields": {"cpf": "12345678909"}, "cancel_order_fields": {}, "intents": {}, }, ) self.assertEqual(result["orcamento_max"], 70000) async def test_turn_decision_call_tool_executes_without_router(self): service = OrquestradorService.__new__(OrquestradorService) service.tool_executor = FakeToolExecutor(result={"numero_pedido": "PED-1", "status": "Ativo"}) service.llm = FakeLLM([]) service._capture_review_confirmation_suggestion = lambda **kwargs: None service._capture_tool_result_context = lambda **kwargs: None service._should_use_deterministic_response = lambda tool_name: True service._fallback_format_tool_result = lambda tool_name, tool_result: f"{tool_name}:{tool_result['numero_pedido']}" async def fake_render_tool_response_with_fallback(**kwargs): return f"{kwargs['tool_name']}:{kwargs['tool_result']['numero_pedido']}" service._render_tool_response_with_fallback = fake_render_tool_response_with_fallback service._http_exception_detail = lambda exc: str(exc) service._is_low_value_response = lambda text: False async def finish(response: str, queue_notice: str | None = None) -> str: return response if not queue_notice else f"{queue_notice}\n{response}" response = await service._try_execute_business_tool_from_turn_decision( message="quero fechar o pedido", user_id=7, turn_decision={ "action": "call_tool", "tool_name": "realizar_pedido", "tool_arguments": {"cpf": "12345678909", "vehicle_id": 1}, }, queue_notice=None, finish=finish, ) self.assertEqual( service.tool_executor.calls, [("realizar_pedido", {"cpf": "12345678909", "vehicle_id": 1}, 7)], ) self.assertEqual(response, "realizar_pedido:PED-1") self.assertEqual(service.llm.calls, 0) async def test_empty_stock_search_suggests_nearby_options(self): service = OrquestradorService.__new__(OrquestradorService) service.normalizer = EntityNormalizer() service.tool_executor = FakeToolExecutor(result=[]) service._get_user_context = lambda user_id: { "generic_memory": {}, "shared_memory": {}, "last_stock_results": [], "selected_vehicle": None, } service._capture_tool_result_context = lambda tool_name, tool_result, user_id: None service._normalize_positive_number = service.normalizer.normalize_positive_number response = await service._maybe_build_stock_suggestion_response( tool_name="consultar_estoque", arguments={"preco_max": 50000, "limite": 5}, tool_result=[], user_id=5, ) self.assertIn("Nao encontrei veiculos ate R$ 50.000.", response) self.assertIn("Hyundai HB20 2022", response) self.assertIn("Se quiser, responda com o numero da lista ou com o modelo.", response) async def test_turn_decision_answer_user_can_short_circuit_router(self): decision = { "intent": "general", "domain": "general", "action": "answer_user", "response_to_user": "Resposta direta do contrato.", } self.assertEqual(str(decision.get("action") or ""), "answer_user") self.assertEqual(str(decision.get("response_to_user") or "").strip(), "Resposta direta do contrato.") async def test_pending_order_selection_prefers_turn_decision_domain(self): state = FakeState( contexts={ 9: { "pending_order_selection": { "orders": [ {"domain": "review", "message": "agendar revisao", "memory_seed": {}}, {"domain": "sales", "message": "fazer pedido", "memory_seed": {}}, ], "expires_at": datetime.utcnow() + timedelta(minutes=15), }, "order_queue": [], "active_domain": "general", "generic_memory": {}, } } ) policy = ConversationPolicy(service=FakePolicyService(state)) response = await policy.try_resolve_pending_order_selection( message="quero comprar", user_id=9, turn_decision={"domain": "sales", "intent": "order_create", "action": "collect_order_create"}, ) self.assertIn("Vou comecar por: Venda: fazer pedido", response) async def test_pending_order_selection_prefers_turn_decision_selection_index(self): state = FakeState( contexts={ 9: { "pending_order_selection": { "orders": [ {"domain": "review", "message": "agendar revisao", "memory_seed": {}}, {"domain": "sales", "message": "fazer pedido", "memory_seed": {}}, ], "expires_at": datetime.utcnow() + timedelta(minutes=15), }, "order_queue": [], "active_domain": "general", "generic_memory": {}, } } ) policy = ConversationPolicy(service=FakePolicyService(state)) response = await policy.try_resolve_pending_order_selection( message="esse", user_id=9, turn_decision={"domain": "general", "intent": "general", "action": "answer_user", "selection_index": 1}, ) self.assertIn("Vou comecar por: Venda: fazer pedido", response) async def test_try_continue_queue_prefers_turn_decision_action(self): state = FakeState( contexts={ 9: { "pending_switch": { "target_domain": "sales", "queued_message": "fazer pedido", "memory_seed": {"cpf": "12345678909"}, "expires_at": datetime.utcnow() + timedelta(minutes=15), }, "active_domain": "general", "generic_memory": {}, "pending_order_selection": None, } } ) service = FakePolicyService(state) policy = ConversationPolicy(service=service) policy.apply_domain_switch = lambda user_id, target_domain: service._get_user_context(user_id).update( {"active_domain": target_domain, "pending_switch": None} ) response = await policy.try_continue_queued_order( message="ok", user_id=9, turn_decision={"action": "continue_queue", "intent": "queue_continue", "domain": "sales"}, ) self.assertIn("Agora, sobre a compra do veiculo:", response) def test_handle_context_switch_prefers_turn_decision_domain_confirmation(self): state = FakeState( contexts={ 9: { "pending_switch": { "target_domain": "review", "expires_at": datetime.utcnow() + timedelta(minutes=15), }, "active_domain": "sales", "generic_memory": {}, "pending_order_selection": None, } } ) service = FakePolicyService(state) policy = ConversationPolicy(service=service) policy.apply_domain_switch = lambda user_id, target_domain: service._get_user_context(user_id).update( {"active_domain": target_domain, "pending_switch": None} ) response = policy.handle_context_switch( message="quero revisar", user_id=9, target_domain_hint="review", turn_decision={"domain": "review", "intent": "review_schedule", "action": "collect_review_schedule"}, ) self.assertEqual(response, "Certo, contexto anterior encerrado. Vamos seguir com agendamento de revisao.") if __name__ == "__main__": unittest.main()