diff --git a/app/services/flows/review_flow.py b/app/services/flows/review_flow.py index 41248c4..1af44d2 100644 --- a/app/services/flows/review_flow.py +++ b/app/services/flows/review_flow.py @@ -36,7 +36,7 @@ class ReviewFlowMixin: if has_list_intent: self._reset_pending_review_states(user_id=user_id) try: - tool_result = await self.registry.execute( + tool_result = await self.tool_executor.execute( "listar_agendamentos_revisao", {"limite": 20}, user_id=user_id, @@ -87,7 +87,7 @@ class ReviewFlowMixin: if missing: return self._render_missing_review_reschedule_fields_prompt(missing) try: - tool_result = await self.registry.execute( + tool_result = await self.tool_executor.execute( "editar_data_revisao", { "protocolo": draft["payload"]["protocolo"], @@ -96,6 +96,11 @@ class ReviewFlowMixin: user_id=user_id, ) except HTTPException as exc: + error = self.tool_executor.coerce_http_error(exc) + if error.get("retryable") and error.get("field"): + draft["payload"].pop(str(error["field"]), None) + draft["expires_at"] = datetime.utcnow() + timedelta(minutes=PENDING_REVIEW_DRAFT_TTL_MINUTES) + self.state.set_entry("pending_review_management_drafts", user_id, draft) return self._http_exception_detail(exc) self.state.pop_entry("pending_review_management_drafts", user_id) return self._fallback_format_tool_result("editar_data_revisao", tool_result) @@ -104,7 +109,7 @@ class ReviewFlowMixin: if missing: return self._render_missing_review_cancel_fields_prompt(missing) try: - tool_result = await self.registry.execute( + tool_result = await self.tool_executor.execute( "cancelar_agendamento_revisao", { "protocolo": draft["payload"]["protocolo"], @@ -113,6 +118,11 @@ class ReviewFlowMixin: user_id=user_id, ) except HTTPException as exc: + error = self.tool_executor.coerce_http_error(exc) + if error.get("retryable") and error.get("field"): + draft["payload"].pop(str(error["field"]), None) + draft["expires_at"] = datetime.utcnow() + timedelta(minutes=PENDING_REVIEW_DRAFT_TTL_MINUTES) + self.state.set_entry("pending_review_management_drafts", user_id, draft) return self._http_exception_detail(exc) self.state.pop_entry("pending_review_management_drafts", user_id) return self._fallback_format_tool_result("cancelar_agendamento_revisao", tool_result) @@ -295,20 +305,23 @@ class ReviewFlowMixin: return self._render_missing_review_fields_prompt(missing) try: - tool_result = await self.registry.execute( + tool_result = await self.tool_executor.execute( "agendar_revisao", draft["payload"], user_id=user_id, ) except HTTPException as exc: + error = self.tool_executor.coerce_http_error(exc) self._capture_review_confirmation_suggestion( tool_name="agendar_revisao", arguments=draft["payload"], exc=exc, user_id=user_id, ) - if self.state.get_entry("pending_review_confirmations", user_id, expire=True): - self.state.pop_entry("pending_review_drafts", user_id) + if error.get("retryable") and error.get("field"): + draft["payload"].pop(str(error["field"]), None) + draft["expires_at"] = datetime.utcnow() + timedelta(minutes=PENDING_REVIEW_DRAFT_TTL_MINUTES) + self.state.set_entry("pending_review_drafts", user_id, draft) return self._http_exception_detail(exc) self.state.pop_entry("pending_review_drafts", user_id) diff --git a/app/services/orchestration/orchestrator_config.py b/app/services/orchestration/orchestrator_config.py index 10f9ece..bdb4b0c 100644 --- a/app/services/orchestration/orchestrator_config.py +++ b/app/services/orchestration/orchestrator_config.py @@ -40,15 +40,6 @@ LOW_VALUE_RESPONSES = { } DETERMINISTIC_RESPONSE_TOOLS = { - "consultar_estoque", - "validar_cliente_venda", - "avaliar_veiculo_troca", - "agendar_revisao", - "listar_agendamentos_revisao", - "cancelar_agendamento_revisao", - "editar_data_revisao", - "cancelar_pedido", - "realizar_pedido", "limpar_contexto_conversa", "continuar_proximo_pedido", "descartar_pedidos_pendentes", diff --git a/app/services/orchestration/orquestrador_service.py b/app/services/orchestration/orquestrador_service.py index 90b543a..4fa1be4 100644 --- a/app/services/orchestration/orquestrador_service.py +++ b/app/services/orchestration/orquestrador_service.py @@ -1,5 +1,7 @@ import logging from datetime import datetime, timedelta +from time import perf_counter +from uuid import uuid4 from fastapi import HTTPException from sqlalchemy.orm import Session @@ -56,16 +58,25 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin): async def handle_message(self, message: str, user_id: int | None = None) -> str: """Processa mensagem, executa tool quando necessario e retorna resposta final.""" + self._turn_trace = { + "request_id": str(uuid4()), + "conversation_id": f"user:{user_id}" if user_id is not None else "anonymous", + "user_id": user_id, + } + self._log_turn_event("turn_received", message=message) + async def finish(response: str, queue_notice: str | None = None) -> str: composed = self._compose_order_aware_response( response=response, user_id=user_id, queue_notice=queue_notice, ) - return await self._maybe_auto_advance_next_order( + final_response = await self._maybe_auto_advance_next_order( base_response=composed, user_id=user_id, ) + self._log_turn_event("turn_completed", response=final_response) + return final_response self._upsert_user_context(user_id=user_id) # Faz uma leitura inicial do turno para ajudar a policy @@ -239,7 +250,8 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin): tools = self.registry.get_tools() - llm_result = await self.llm.generate_response( + llm_result = await self._call_llm_with_trace( + operation="router", message=self._build_router_prompt(user_message=routing_message, user_id=user_id), tools=tools, ) @@ -251,7 +263,8 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin): and self._is_low_value_response(first_pass_text) ) if should_force_tool: - llm_result = await self.llm.generate_response( + llm_result = await self._call_llm_with_trace( + operation="force_tool", message=self._build_force_tool_prompt(user_message=routing_message, user_id=user_id), tools=tools, ) @@ -261,7 +274,7 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin): arguments = llm_result["tool_call"]["arguments"] try: - tool_result = await self.tool_executor.execute( + tool_result = await self._execute_tool_with_trace( tool_name, arguments, user_id=user_id, @@ -295,24 +308,13 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin): queue_notice=queue_notice, ) - final_response = await self.llm.generate_response( - message=self._build_result_prompt( + return await finish( + await self._render_tool_response_with_fallback( user_message=routing_message, user_id=user_id, tool_name=tool_name, tool_result=tool_result, ), - tools=[], - ) - text = (final_response.get("response") or "").strip() - if self._is_low_value_response(text): - return await finish( - self._fallback_format_tool_result(tool_name, tool_result), - queue_notice=queue_notice, - ) - - return await finish( - text or self._fallback_format_tool_result(tool_name, tool_result), queue_notice=queue_notice, ) @@ -352,7 +354,7 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin): ): return None try: - tool_result = await self.tool_executor.execute( + tool_result = await self._execute_tool_with_trace( planned_tool_name, decision_tool_arguments or {}, user_id=user_id, @@ -365,7 +367,8 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin): ) tools = self.registry.get_tools() - llm_result = await self.llm.generate_response( + llm_result = await self._call_llm_with_trace( + operation="orchestration_router", message=self._build_router_prompt(user_message=message, user_id=user_id), tools=tools, ) @@ -405,7 +408,8 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin): if not should_force_tool: return None - llm_result = await self.llm.generate_response( + llm_result = await self._call_llm_with_trace( + operation="orchestration_force_tool", message=self._build_force_tool_prompt(user_message=message, user_id=user_id), tools=tools, ) @@ -420,7 +424,7 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin): return None try: - tool_result = await self.tool_executor.execute( + tool_result = await self._execute_tool_with_trace( forced_tool_name, forced_tool_call.get("arguments") or {}, user_id=user_id, @@ -450,7 +454,7 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin): arguments = decision.get("tool_arguments") if isinstance(decision.get("tool_arguments"), dict) else {} try: - tool_result = await self.tool_executor.execute( + tool_result = await self._execute_tool_with_trace( tool_name, arguments, user_id=user_id, @@ -485,23 +489,13 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin): queue_notice=queue_notice, ) - final_response = await self.llm.generate_response( - message=self._build_result_prompt( + return await finish( + await self._render_tool_response_with_fallback( user_message=message, user_id=user_id, tool_name=tool_name, tool_result=tool_result, ), - tools=[], - ) - text = (final_response.get("response") or "").strip() - if self._is_low_value_response(text): - return await finish( - self._fallback_format_tool_result(tool_name, tool_result), - queue_notice=queue_notice, - ) - return await finish( - text or self._fallback_format_tool_result(tool_name, tool_result), queue_notice=queue_notice, ) @@ -1241,6 +1235,108 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin): conversation_context=conversation_context, ) + def _log_turn_event(self, event: str, **payload) -> None: + trace = getattr(self, "_turn_trace", {}) or {} + logger.info( + "turn_event=%s payload=%s", + event, + { + "request_id": trace.get("request_id"), + "conversation_id": trace.get("conversation_id"), + **payload, + }, + ) + + async def _call_llm_with_trace(self, operation: str, message: str, tools): + started_at = perf_counter() + try: + result = await self.llm.generate_response(message=message, tools=tools) + elapsed_ms = round((perf_counter() - started_at) * 1000, 2) + self._log_turn_event( + "llm_completed", + operation=operation, + elapsed_ms=elapsed_ms, + tool_call=bool(result.get("tool_call")), + ) + return result + except Exception: + elapsed_ms = round((perf_counter() - started_at) * 1000, 2) + self._log_turn_event( + "llm_failed", + operation=operation, + elapsed_ms=elapsed_ms, + ) + raise + + async def _execute_tool_with_trace(self, tool_name: str, arguments: dict, user_id: int | None): + started_at = perf_counter() + try: + result = await self.tool_executor.execute(tool_name, arguments, user_id=user_id) + elapsed_ms = round((perf_counter() - started_at) * 1000, 2) + self._log_turn_event( + "tool_completed", + tool_name=tool_name, + elapsed_ms=elapsed_ms, + arguments=arguments, + result=result, + ) + return result + except HTTPException as exc: + elapsed_ms = round((perf_counter() - started_at) * 1000, 2) + self._log_turn_event( + "tool_failed", + tool_name=tool_name, + elapsed_ms=elapsed_ms, + arguments=arguments, + error=self.tool_executor.coerce_http_error(exc), + ) + raise + + async def _render_tool_response_with_fallback( + self, + user_message: str, + user_id: int | None, + tool_name: str, + tool_result, + ) -> str: + fallback_response = self._fallback_format_tool_result(tool_name, tool_result) + if self._should_use_deterministic_response(tool_name): + self._log_turn_event( + "tool_response_fallback", + tool_name=tool_name, + reason="deterministic_tool", + ) + return fallback_response + + try: + final_response = await self._call_llm_with_trace( + operation="tool_result_response", + message=self._build_result_prompt( + user_message=user_message, + user_id=user_id, + tool_name=tool_name, + tool_result=tool_result, + ), + tools=[], + ) + except Exception: + self._log_turn_event( + "tool_response_fallback", + tool_name=tool_name, + reason="llm_failure", + ) + return fallback_response + + text = (final_response.get("response") or "").strip() + if self._is_low_value_response(text): + self._log_turn_event( + "tool_response_fallback", + tool_name=tool_name, + reason="low_value_response", + ) + return fallback_response + return text or fallback_response + def _http_exception_detail(self, exc: HTTPException) -> str: return self.tool_executor.http_exception_detail(exc) diff --git a/app/services/orchestration/tool_executor.py b/app/services/orchestration/tool_executor.py index 2f4b0df..f85a89a 100644 --- a/app/services/orchestration/tool_executor.py +++ b/app/services/orchestration/tool_executor.py @@ -1,5 +1,7 @@ from fastapi import HTTPException +from app.services.domain.tool_errors import build_tool_error +from app.services.orchestration.orchestrator_config import ORCHESTRATION_CONTROL_TOOLS from app.services.orchestration.orchestrator_config import DETERMINISTIC_RESPONSE_TOOLS from app.services.orchestration.response_formatter import fallback_format_tool_result @@ -13,17 +15,43 @@ class ToolExecutor: return await self.registry.execute(tool_name, arguments, user_id=user_id) def should_use_deterministic_response(self, tool_name: str) -> bool: - return tool_name in DETERMINISTIC_RESPONSE_TOOLS + return tool_name in DETERMINISTIC_RESPONSE_TOOLS or tool_name in ORCHESTRATION_CONTROL_TOOLS - def http_exception_detail(self, exc: HTTPException) -> str: + def coerce_http_error(self, exc: HTTPException) -> dict: detail = exc.detail - if isinstance(detail, str): - return detail if isinstance(detail, dict): message = str(detail.get("message") or "").strip() - if message: - return message - return "Nao foi possivel concluir a operacao solicitada." + return { + "code": str(detail.get("code") or "tool_error").strip() or "tool_error", + "message": message or "Nao foi possivel concluir a operacao solicitada.", + "retryable": bool(detail.get("retryable", False)), + "field": str(detail.get("field") or "").strip() or None, + "status_code": exc.status_code, + "raw": detail, + } + if isinstance(detail, str): + normalized = build_tool_error( + code="tool_error", + message=detail, + retryable=400 <= exc.status_code < 500, + ) + else: + normalized = build_tool_error( + code="tool_error", + message="Nao foi possivel concluir a operacao solicitada.", + retryable=False, + ) + return { + "code": normalized["code"], + "message": normalized["message"], + "retryable": normalized["retryable"], + "field": normalized.get("field"), + "status_code": exc.status_code, + "raw": normalized, + } + + def http_exception_detail(self, exc: HTTPException) -> str: + return self.coerce_http_error(exc)["message"] def fallback_format_tool_result(self, tool_name: str, tool_result) -> str: return fallback_format_tool_result(tool_name=tool_name, tool_result=tool_result) diff --git a/tests/test_turn_decision_contract.py b/tests/test_turn_decision_contract.py index fdb7c1f..d5f5eff 100644 --- a/tests/test_turn_decision_contract.py +++ b/tests/test_turn_decision_contract.py @@ -58,6 +58,15 @@ class FakeToolExecutor: ] return self.result + def coerce_http_error(self, exc): + detail = exc.detail if isinstance(exc.detail, dict) else {} + return { + "code": detail.get("code", "tool_error"), + "message": detail.get("message", str(exc)), + "retryable": bool(detail.get("retryable", False)), + "field": detail.get("field"), + } + class FakePolicyService: def __init__(self, state): @@ -273,7 +282,9 @@ class TurnDecisionContractTests(unittest.IsolatedAsyncioTestCase): service._capture_tool_result_context = lambda **kwargs: None service._should_use_deterministic_response = lambda tool_name: True service._fallback_format_tool_result = lambda tool_name, tool_result: f"{tool_name}:{tool_result['numero_pedido']}" - service._build_result_prompt = lambda **kwargs: "unused" + async def fake_render_tool_response_with_fallback(**kwargs): + return f"{kwargs['tool_name']}:{kwargs['tool_result']['numero_pedido']}" + service._render_tool_response_with_fallback = fake_render_tool_response_with_fallback service._http_exception_detail = lambda exc: str(exc) service._is_low_value_response = lambda text: False