diff --git a/app/db/init_db.py b/app/db/init_db.py index 30b3bb7..3e7c01b 100644 --- a/app/db/init_db.py +++ b/app/db/init_db.py @@ -7,7 +7,7 @@ from app.core.settings import settings from app.db.database import Base, engine from app.db.mock_database import MockBase, mock_engine from app.db.models import Tool -from app.db.mock_models import Customer, Order, ReviewSchedule, Vehicle +from app.db.mock_models import ConversationTurn, Customer, Order, ReviewSchedule, Vehicle from app.db.mock_seed import seed_mock_data from app.db.tool_seed import seed_tools diff --git a/app/db/mock_models.py b/app/db/mock_models.py index a29e0ce..9193425 100644 --- a/app/db/mock_models.py +++ b/app/db/mock_models.py @@ -1,6 +1,7 @@ from sqlalchemy import Boolean, Column, DateTime, Float, ForeignKey, Integer, String, Text, UniqueConstraint from sqlalchemy.sql import func +from app.core.time_utils import utc_now from app.db.mock_database import MockBase @@ -78,3 +79,27 @@ class ReviewSchedule(MockBase): data_hora = Column(DateTime, nullable=False) status = Column(String(20), nullable=False, default="agendado") created_at = Column(DateTime, server_default=func.current_timestamp()) + + +class ConversationTurn(MockBase): + __tablename__ = "conversation_turns" + + id = Column(Integer, primary_key=True, index=True) + request_id = Column(String(80), unique=True, nullable=False, index=True) + conversation_id = Column(String(120), nullable=False, index=True) + user_id = Column(Integer, ForeignKey("users.id"), nullable=True, index=True) + channel = Column(String(40), nullable=True, index=True) + external_id = Column(String(120), nullable=True, index=True) + username = Column(String(120), nullable=True) + user_message = Column(Text, nullable=False) + assistant_response = Column(Text, nullable=True) + turn_status = Column(String(20), nullable=False, default="completed", index=True) + intent = Column(String(80), nullable=True, index=True) + domain = Column(String(40), nullable=True, index=True) + action = Column(String(80), nullable=True) + tool_name = Column(String(120), nullable=True, index=True) + tool_arguments = Column(Text, nullable=True) + error_detail = Column(Text, nullable=True) + elapsed_ms = Column(Float, nullable=True) + started_at = Column(DateTime, nullable=False, default=utc_now, index=True) + completed_at = Column(DateTime, nullable=True, index=True) diff --git a/app/services/orchestration/conversation_history_service.py b/app/services/orchestration/conversation_history_service.py new file mode 100644 index 0000000..64c8427 --- /dev/null +++ b/app/services/orchestration/conversation_history_service.py @@ -0,0 +1,170 @@ +import json +import logging +from datetime import datetime +from typing import Any + +from app.db.mock_database import SessionMockLocal +from app.db.mock_models import ConversationTurn, User + + +logger = logging.getLogger(__name__) + + +class ConversationHistoryService: + """Persiste uma trilha simples de auditoria por turno no banco mock.""" + + def record_turn( + self, + *, + request_id: str, + conversation_id: str, + user_id: int | None, + user_message: str, + assistant_response: str | None, + turn_status: str, + intent: str | None = None, + domain: str | None = None, + action: str | None = None, + tool_name: str | None = None, + tool_arguments: dict[str, Any] | None = None, + error_detail: str | None = None, + started_at: datetime | None = None, + completed_at: datetime | None = None, + elapsed_ms: float | None = None, + ) -> None: + db = SessionMockLocal() + try: + channel = None + external_id = None + username = None + if user_id is not None: + user = db.query(User).filter(User.id == user_id).first() + if user: + channel = user.channel + external_id = user.external_id + username = user.username + + payload = { + "request_id": str(request_id or ""), + "conversation_id": str(conversation_id or "anonymous"), + "user_id": user_id, + "channel": channel, + "external_id": external_id, + "username": username, + "user_message": str(user_message or ""), + "assistant_response": assistant_response, + "turn_status": str(turn_status or "completed"), + "intent": self._clean_text(intent), + "domain": self._clean_text(domain), + "action": self._clean_text(action), + "tool_name": self._clean_text(tool_name), + "tool_arguments": self._serialize_json(tool_arguments), + "error_detail": error_detail, + } + if started_at is not None: + payload["started_at"] = started_at + if completed_at is not None: + payload["completed_at"] = completed_at + if elapsed_ms is not None: + payload["elapsed_ms"] = elapsed_ms + + record = ConversationTurn(**payload) + db.add(record) + db.commit() + except Exception: + db.rollback() + logger.exception( + "Falha ao persistir historico de conversa.", + extra={ + "request_id": request_id, + "conversation_id": conversation_id, + "user_id": user_id, + }, + ) + finally: + db.close() + + def list_turns( + self, + *, + user_id: int | None = None, + conversation_id: str | None = None, + request_id: str | None = None, + turn_status: str | None = None, + limit: int = 20, + ) -> list[dict[str, Any]]: + db = SessionMockLocal() + try: + query = db.query(ConversationTurn) + + if user_id is not None: + query = query.filter(ConversationTurn.user_id == user_id) + + normalized_conversation_id = self._clean_text(conversation_id) + if normalized_conversation_id: + query = query.filter(ConversationTurn.conversation_id == normalized_conversation_id) + + normalized_request_id = self._clean_text(request_id) + if normalized_request_id: + query = query.filter(ConversationTurn.request_id == normalized_request_id) + + normalized_turn_status = self._clean_text(turn_status) + if normalized_turn_status: + query = query.filter(ConversationTurn.turn_status == normalized_turn_status) + + safe_limit = max(1, min(int(limit or 20), 100)) + rows = ( + query.order_by(ConversationTurn.started_at.desc(), ConversationTurn.id.desc()) + .limit(safe_limit) + .all() + ) + return [self._serialize_row(row) for row in rows] + finally: + db.close() + + def _clean_text(self, value: str | None) -> str | None: + text = str(value or "").strip() + return text or None + + def _serialize_row(self, row: ConversationTurn) -> dict[str, Any]: + return { + "id": row.id, + "request_id": row.request_id, + "conversation_id": row.conversation_id, + "user_id": row.user_id, + "channel": row.channel, + "external_id": row.external_id, + "username": row.username, + "user_message": row.user_message, + "assistant_response": row.assistant_response, + "turn_status": row.turn_status, + "intent": row.intent, + "domain": row.domain, + "action": row.action, + "tool_name": row.tool_name, + "tool_arguments": self._deserialize_json(row.tool_arguments), + "error_detail": row.error_detail, + "elapsed_ms": row.elapsed_ms, + "started_at": row.started_at.isoformat() if row.started_at else None, + "completed_at": row.completed_at.isoformat() if row.completed_at else None, + } + + def _deserialize_json(self, value: str | None) -> dict[str, Any] | None: + text = str(value or "").strip() + if not text: + return None + try: + payload = json.loads(text) + except (TypeError, ValueError): + return None + return payload if isinstance(payload, dict) else None + + def _serialize_json(self, value: dict[str, Any] | None) -> str | None: + if not isinstance(value, dict) or not value: + return None + return json.dumps( + value, + ensure_ascii=True, + separators=(",", ":"), + default=str, + ) diff --git a/app/services/orchestration/orquestrador_service.py b/app/services/orchestration/orquestrador_service.py index 9b57575..945f576 100644 --- a/app/services/orchestration/orquestrador_service.py +++ b/app/services/orchestration/orquestrador_service.py @@ -1,3 +1,4 @@ +import json import logging from datetime import datetime, timedelta from app.core.time_utils import utc_now @@ -18,6 +19,7 @@ from app.services.orchestration.conversation_policy import ConversationPolicy from app.services.orchestration.conversation_state_repository import ConversationStateRepository from app.services.orchestration.entity_normalizer import EntityNormalizer from app.services.orchestration.message_planner import MessagePlanner +from app.services.orchestration.conversation_history_service import ConversationHistoryService from app.services.orchestration.state_repository_factory import get_conversation_state_repository from app.services.flows.order_flow import OrderFlowMixin from app.services.orchestration.prompt_builders import ( @@ -48,6 +50,7 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin): self.registry = ToolRegistry(db, extra_handlers=self._build_orchestration_tool_handlers()) self.tool_executor = ToolExecutor(registry=self.registry) self.policy = ConversationPolicy(service=self) + self.history_service = ConversationHistoryService() def _build_orchestration_tool_handlers(self) -> dict: return { @@ -59,14 +62,19 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin): async def handle_message(self, message: str, user_id: int | None = None) -> str: """Processa mensagem, executa tool quando necessario e retorna resposta final.""" + turn_started_at = utc_now() + turn_started_perf = perf_counter() + turn_history_persisted = False self._turn_trace = { "request_id": str(uuid4()), "conversation_id": f"user:{user_id}" if user_id is not None else "anonymous", "user_id": user_id, + "started_at": turn_started_at, } self._log_turn_event("turn_received", message=message) async def finish(response: str, queue_notice: str | None = None) -> str: + nonlocal turn_history_persisted composed = self._compose_order_aware_response( response=response, user_id=user_id, @@ -76,373 +84,394 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin): base_response=composed, user_id=user_id, ) + self._turn_trace["elapsed_ms"] = round((perf_counter() - turn_started_perf) * 1000, 2) self._log_turn_event("turn_completed", response=final_response) + if not turn_history_persisted: + self._finalize_turn_history( + user_message=message, + assistant_response=final_response, + turn_status="completed", + ) + turn_history_persisted = True return final_response - self._upsert_user_context(user_id=user_id) - if hasattr(self, "policy") and self._is_order_selection_reset_message(message): + try: + self._upsert_user_context(user_id=user_id) + if hasattr(self, "policy") and self._is_order_selection_reset_message(message): + reset_override = await self._try_handle_immediate_context_reset( + message=message, + user_id=user_id, + turn_decision={"action": "clear_context"}, + finish=finish, + ) + if reset_override: + return reset_override + if hasattr(self, "policy"): + pending_switch_override = self._handle_context_switch( + message=message, + user_id=user_id, + target_domain_hint="general", + turn_decision=None, + ) + if pending_switch_override: + return await finish(pending_switch_override) + pending_stock_selection_follow_up = await self._try_handle_pending_stock_selection_follow_up( + message=message, + user_id=user_id, + finish=finish, + ) + if pending_stock_selection_follow_up: + return pending_stock_selection_follow_up + active_sales_follow_up = await self._try_handle_active_sales_follow_up( + message=message, + user_id=user_id, + finish=finish, + ) + if active_sales_follow_up: + return active_sales_follow_up + active_review_follow_up = await self._try_handle_active_review_follow_up( + message=message, + user_id=user_id, + finish=finish, + ) + if active_review_follow_up: + return active_review_follow_up + # Faz uma leitura inicial do turno para ajudar a policy + # com fila, troca de contexto e comandos globais. + early_turn_decision = await self._extract_turn_decision_with_llm( + message=message, + user_id=user_id, + ) reset_override = await self._try_handle_immediate_context_reset( message=message, user_id=user_id, - turn_decision={"action": "clear_context"}, + turn_decision=early_turn_decision, finish=finish, ) if reset_override: return reset_override - if hasattr(self, "policy"): - pending_switch_override = self._handle_context_switch( + pending_order_selection = await self._try_resolve_pending_order_selection( message=message, user_id=user_id, - target_domain_hint="general", - turn_decision=None, + turn_decision=early_turn_decision, ) - if pending_switch_override: - return await finish(pending_switch_override) - pending_stock_selection_follow_up = await self._try_handle_pending_stock_selection_follow_up( - message=message, - user_id=user_id, - finish=finish, - ) - if pending_stock_selection_follow_up: - return pending_stock_selection_follow_up - active_sales_follow_up = await self._try_handle_active_sales_follow_up( - message=message, - user_id=user_id, - finish=finish, - ) - if active_sales_follow_up: - return active_sales_follow_up - active_review_follow_up = await self._try_handle_active_review_follow_up( - message=message, - user_id=user_id, - finish=finish, - ) - if active_review_follow_up: - return active_review_follow_up - # Faz uma leitura inicial do turno para ajudar a policy - # com fila, troca de contexto e comandos globais. - early_turn_decision = await self._extract_turn_decision_with_llm( - message=message, - user_id=user_id, - ) - reset_override = await self._try_handle_immediate_context_reset( - message=message, - user_id=user_id, - turn_decision=early_turn_decision, - finish=finish, - ) - if reset_override: - return reset_override - pending_order_selection = await self._try_resolve_pending_order_selection( - message=message, - user_id=user_id, - turn_decision=early_turn_decision, - ) - if pending_order_selection: - return pending_order_selection - queued_followup = await self._try_continue_queued_order( - message=message, - user_id=user_id, - turn_decision=early_turn_decision, - ) - if queued_followup: - return queued_followup + if pending_order_selection: + return pending_order_selection + queued_followup = await self._try_continue_queued_order( + message=message, + user_id=user_id, + turn_decision=early_turn_decision, + ) + if queued_followup: + return queued_followup - message_plan = await self._extract_message_plan_with_llm( - message=message, - user_id=user_id, - ) - routing_plan = { - "orders": [ - { - "domain": item.get("domain", "general"), - "message": item.get("message", ""), - } - for item in message_plan.get("orders", []) - ] - } + message_plan = await self._extract_message_plan_with_llm( + message=message, + user_id=user_id, + ) + routing_plan = { + "orders": [ + { + "domain": item.get("domain", "general"), + "message": item.get("message", ""), + } + for item in message_plan.get("orders", []) + ] + } - ( - routing_message, - queue_notice, - queue_early_response, - ) = self._prepare_message_for_single_order( - message=message, - user_id=user_id, - routing_plan=routing_plan, - ) - if queue_early_response: - return await finish(queue_early_response, queue_notice=queue_notice) + ( + routing_message, + queue_notice, + queue_early_response, + ) = self._prepare_message_for_single_order( + message=message, + user_id=user_id, + routing_plan=routing_plan, + ) + if queue_early_response: + return await finish(queue_early_response, queue_notice=queue_notice) - extracted_entities = self._resolve_entities_for_message_plan( - message_plan=message_plan, - routed_message=routing_message, - ) - # Depois do roteamento para um unico pedido, pede a decisao - # estruturada do turno final que sera executado. - if (routing_message or "").strip() == (message or "").strip(): - turn_decision = early_turn_decision - else: - turn_decision = await self._extract_turn_decision_with_llm( + extracted_entities = self._resolve_entities_for_message_plan( + message_plan=message_plan, + routed_message=routing_message, + ) + # Depois do roteamento para um unico pedido, pede a decisao + # estruturada do turno final que sera executado. + if (routing_message or "").strip() == (message or "").strip(): + turn_decision = early_turn_decision + else: + turn_decision = await self._extract_turn_decision_with_llm( + message=routing_message, + user_id=user_id, + ) + self._capture_turn_decision_trace(turn_decision) + llm_extracted_entities = await self._extract_entities_with_llm( message=routing_message, user_id=user_id, ) - llm_extracted_entities = await self._extract_entities_with_llm( - message=routing_message, - user_id=user_id, - ) - extracted_entities = self._merge_extracted_entities( - extracted_entities, - llm_extracted_entities, - ) - if self._has_useful_turn_decision(turn_decision): - extracted_entities = self._merge_extracted_entities( - extracted_entities, - self._extracted_entities_from_turn_decision(turn_decision), - ) - self._capture_generic_memory( - user_id=user_id, - llm_generic_fields=extracted_entities.get("generic_memory", {}), - ) - sales_search_context = await self._extract_missing_sales_search_context_with_llm( - message=routing_message, - user_id=user_id, - turn_decision=turn_decision, - extracted_entities=extracted_entities, - ) - if sales_search_context: extracted_entities = self._merge_extracted_entities( extracted_entities, - { - "generic_memory": sales_search_context, - "review_fields": {}, - "review_management_fields": {}, - "order_fields": {}, - "cancel_order_fields": {}, - "intents": {}, - }, + llm_extracted_entities, ) + if self._has_useful_turn_decision(turn_decision): + extracted_entities = self._merge_extracted_entities( + extracted_entities, + self._extracted_entities_from_turn_decision(turn_decision), + ) self._capture_generic_memory( user_id=user_id, - llm_generic_fields=sales_search_context, + llm_generic_fields=extracted_entities.get("generic_memory", {}), ) + sales_search_context = await self._extract_missing_sales_search_context_with_llm( + message=routing_message, + user_id=user_id, + turn_decision=turn_decision, + extracted_entities=extracted_entities, + ) + if sales_search_context: + extracted_entities = self._merge_extracted_entities( + extracted_entities, + { + "generic_memory": sales_search_context, + "review_fields": {}, + "review_management_fields": {}, + "order_fields": {}, + "cancel_order_fields": {}, + "intents": {}, + }, + ) + self._capture_generic_memory( + user_id=user_id, + llm_generic_fields=sales_search_context, + ) - should_prioritize_review_flow = self._should_prioritize_review_flow( - turn_decision=turn_decision, - extracted_entities=extracted_entities, - user_id=user_id, - ) - should_prioritize_order_flow = self._should_prioritize_order_flow( - turn_decision=turn_decision, - extracted_entities=extracted_entities, - user_id=user_id, - message=routing_message, - ) - should_prioritize_review_management = self._should_prioritize_review_management( - turn_decision=turn_decision, - user_id=user_id, - ) - domain_hint = self._domain_from_turn_decision(turn_decision) - if domain_hint == "general": - domain_hint = self._domain_from_intents(extracted_entities.get("intents", {})) - if self._should_consume_sales_follow_up_in_active_flow( - message=routing_message, - user_id=user_id, - extracted_entities=extracted_entities, - ): - domain_hint = "sales" - context_switch_response = self._handle_context_switch( - message=routing_message, - user_id=user_id, - target_domain_hint=domain_hint, - turn_decision=turn_decision, - ) - if context_switch_response: - return await finish(context_switch_response, queue_notice=queue_notice) - - self._update_active_domain(user_id=user_id, domain_hint=domain_hint) + should_prioritize_review_flow = self._should_prioritize_review_flow( + turn_decision=turn_decision, + extracted_entities=extracted_entities, + user_id=user_id, + ) + should_prioritize_order_flow = self._should_prioritize_order_flow( + turn_decision=turn_decision, + extracted_entities=extracted_entities, + user_id=user_id, + message=routing_message, + ) + should_prioritize_review_management = self._should_prioritize_review_management( + turn_decision=turn_decision, + user_id=user_id, + ) + domain_hint = self._domain_from_turn_decision(turn_decision) + if domain_hint == "general": + domain_hint = self._domain_from_intents(extracted_entities.get("intents", {})) + if self._should_consume_sales_follow_up_in_active_flow( + message=routing_message, + user_id=user_id, + extracted_entities=extracted_entities, + ): + domain_hint = "sales" + context_switch_response = self._handle_context_switch( + message=routing_message, + user_id=user_id, + target_domain_hint=domain_hint, + turn_decision=turn_decision, + ) + if context_switch_response: + return await finish(context_switch_response, queue_notice=queue_notice) - orchestration_override = await self._try_execute_orchestration_control_tool( - message=routing_message, - user_id=user_id, - turn_decision=turn_decision, - extracted_entities=extracted_entities, - queue_notice=queue_notice, - finish=finish, - ) - if orchestration_override: - return orchestration_override + self._update_active_domain(user_id=user_id, domain_hint=domain_hint) - decision_action = str(turn_decision.get("action") or "") - decision_response = str(turn_decision.get("response_to_user") or "").strip() - if ( - decision_action == "ask_missing_fields" - and decision_response - and not should_prioritize_review_flow - and not should_prioritize_review_management - and not should_prioritize_order_flow - ): - return await finish(decision_response, queue_notice=queue_notice) - if ( - decision_action == "answer_user" - and decision_response - and not should_prioritize_review_flow - and not should_prioritize_review_management - and not should_prioritize_order_flow - ): - return await finish(decision_response, queue_notice=queue_notice) - - if not should_prioritize_order_flow: - planned_tool_response = await self._try_execute_business_tool_from_turn_decision( + orchestration_override = await self._try_execute_orchestration_control_tool( message=routing_message, user_id=user_id, turn_decision=turn_decision, + extracted_entities=extracted_entities, queue_notice=queue_notice, finish=finish, ) - if planned_tool_response: - return planned_tool_response + if orchestration_override: + return orchestration_override - review_management_response = await self._try_handle_review_management( - message=routing_message, - user_id=user_id, - extracted_fields=extracted_entities.get("review_management_fields", {}), - intents={}, - turn_decision=turn_decision, - ) - if review_management_response: - return await finish(review_management_response, queue_notice=queue_notice) + decision_action = str(turn_decision.get("action") or "") + decision_response = str(turn_decision.get("response_to_user") or "").strip() + if ( + decision_action == "ask_missing_fields" + and decision_response + and not should_prioritize_review_flow + and not should_prioritize_review_management + and not should_prioritize_order_flow + ): + return await finish(decision_response, queue_notice=queue_notice) + if ( + decision_action == "answer_user" + and decision_response + and not should_prioritize_review_flow + and not should_prioritize_review_management + and not should_prioritize_order_flow + ): + return await finish(decision_response, queue_notice=queue_notice) - # 1) Se houver sugestao pendente de horario e o usuario confirmou ("pode/sim"), - # agenda direto no horario sugerido. - confirmation_response = await self._try_confirm_pending_review( - message=routing_message, - user_id=user_id, - extracted_review_fields=extracted_entities.get("review_fields", {}), - ) - if confirmation_response: - return await finish(confirmation_response, queue_notice=queue_notice) - # 2) Fluxo de coleta incremental de dados da revisao (slot filling). - # Evita pedir tudo de novo quando o usuario responde em partes. - review_response = await self._try_collect_and_schedule_review( - message=routing_message, - user_id=user_id, - extracted_fields=extracted_entities.get("review_fields", {}), - intents={}, - turn_decision=turn_decision, - ) - if review_response: - return await finish(review_response, queue_notice=queue_notice) - # 3) Fluxo de coleta incremental para cancelamento de pedido. - cancel_order_response = await self._try_collect_and_cancel_order( - message=routing_message, - user_id=user_id, - extracted_fields=extracted_entities.get("cancel_order_fields", {}), - intents={}, - turn_decision=turn_decision, - ) - if cancel_order_response: - return await finish(cancel_order_response, queue_notice=queue_notice) - order_listing_response = await self._try_handle_order_listing( - message=routing_message, - user_id=user_id, - intents={}, - turn_decision=turn_decision, - ) - if order_listing_response: - return await finish(order_listing_response, queue_notice=queue_notice) - # 4) Fluxo de coleta incremental para realizacao de pedido. - order_response = await self._try_collect_and_create_order( - message=routing_message, - user_id=user_id, - extracted_fields=extracted_entities.get("order_fields", {}), - intents={}, - turn_decision=turn_decision, - ) - if order_response: - return await finish(order_response, queue_notice=queue_notice) + if not should_prioritize_order_flow: + planned_tool_response = await self._try_execute_business_tool_from_turn_decision( + message=routing_message, + user_id=user_id, + turn_decision=turn_decision, + queue_notice=queue_notice, + finish=finish, + ) + if planned_tool_response: + return planned_tool_response - tools = self.registry.get_tools() + review_management_response = await self._try_handle_review_management( + message=routing_message, + user_id=user_id, + extracted_fields=extracted_entities.get("review_management_fields", {}), + intents={}, + turn_decision=turn_decision, + ) + if review_management_response: + return await finish(review_management_response, queue_notice=queue_notice) - llm_result = await self._call_llm_with_trace( - operation="router", - message=self._build_router_prompt(user_message=routing_message, user_id=user_id), - tools=tools, - ) + # 1) Se houver sugestao pendente de horario e o usuario confirmou ("pode/sim"), + # agenda direto no horario sugerido. + confirmation_response = await self._try_confirm_pending_review( + message=routing_message, + user_id=user_id, + extracted_review_fields=extracted_entities.get("review_fields", {}), + ) + if confirmation_response: + return await finish(confirmation_response, queue_notice=queue_notice) + # 2) Fluxo de coleta incremental de dados da revisao (slot filling). + # Evita pedir tudo de novo quando o usuario responde em partes. + review_response = await self._try_collect_and_schedule_review( + message=routing_message, + user_id=user_id, + extracted_fields=extracted_entities.get("review_fields", {}), + intents={}, + turn_decision=turn_decision, + ) + if review_response: + return await finish(review_response, queue_notice=queue_notice) + # 3) Fluxo de coleta incremental para cancelamento de pedido. + cancel_order_response = await self._try_collect_and_cancel_order( + message=routing_message, + user_id=user_id, + extracted_fields=extracted_entities.get("cancel_order_fields", {}), + intents={}, + turn_decision=turn_decision, + ) + if cancel_order_response: + return await finish(cancel_order_response, queue_notice=queue_notice) + order_listing_response = await self._try_handle_order_listing( + message=routing_message, + user_id=user_id, + intents={}, + turn_decision=turn_decision, + ) + if order_listing_response: + return await finish(order_listing_response, queue_notice=queue_notice) + # 4) Fluxo de coleta incremental para realizacao de pedido. + order_response = await self._try_collect_and_create_order( + message=routing_message, + user_id=user_id, + extracted_fields=extracted_entities.get("order_fields", {}), + intents={}, + turn_decision=turn_decision, + ) + if order_response: + return await finish(order_response, queue_notice=queue_notice) + + tools = self.registry.get_tools() - first_pass_text = (llm_result.get("response") or "").strip() - should_force_tool = ( - not llm_result["tool_call"] - and self._has_operational_intent(extracted_entities) - and self._is_low_value_response(first_pass_text) - ) - if should_force_tool: llm_result = await self._call_llm_with_trace( - operation="force_tool", - message=self._build_force_tool_prompt(user_message=routing_message, user_id=user_id), + operation="router", + message=self._build_router_prompt(user_message=routing_message, user_id=user_id), tools=tools, ) - if llm_result["tool_call"]: - tool_name, arguments = self._normalize_tool_invocation( - tool_name=llm_result["tool_call"]["name"], - arguments=llm_result["tool_call"]["arguments"], - user_id=user_id, + first_pass_text = (llm_result.get("response") or "").strip() + should_force_tool = ( + not llm_result["tool_call"] + and self._has_operational_intent(extracted_entities) + and self._is_low_value_response(first_pass_text) ) + if should_force_tool: + llm_result = await self._call_llm_with_trace( + operation="force_tool", + message=self._build_force_tool_prompt(user_message=routing_message, user_id=user_id), + tools=tools, + ) - try: - tool_result = await self._execute_tool_with_trace( - tool_name, - arguments, + if llm_result["tool_call"]: + tool_name, arguments = self._normalize_tool_invocation( + tool_name=llm_result["tool_call"]["name"], + arguments=llm_result["tool_call"]["arguments"], user_id=user_id, ) - except HTTPException as exc: - self._capture_review_confirmation_suggestion( + + try: + tool_result = await self._execute_tool_with_trace( + tool_name, + arguments, + user_id=user_id, + ) + except HTTPException as exc: + self._capture_review_confirmation_suggestion( + tool_name=tool_name, + arguments=arguments, + exc=exc, + user_id=user_id, + ) + return await finish(self._http_exception_detail(exc), queue_notice=queue_notice) + + stock_suggestion_response = await self._maybe_build_stock_suggestion_response( tool_name=tool_name, arguments=arguments, - exc=exc, + tool_result=tool_result, + user_id=user_id, + ) + if stock_suggestion_response: + return await finish(stock_suggestion_response, queue_notice=queue_notice) + self._capture_successful_tool_side_effects( + tool_name=tool_name, + arguments=arguments, + tool_result=tool_result, user_id=user_id, ) - return await finish(self._http_exception_detail(exc), queue_notice=queue_notice) - stock_suggestion_response = await self._maybe_build_stock_suggestion_response( - tool_name=tool_name, - arguments=arguments, - tool_result=tool_result, - user_id=user_id, - ) - if stock_suggestion_response: - return await finish(stock_suggestion_response, queue_notice=queue_notice) - self._capture_successful_tool_side_effects( - tool_name=tool_name, - arguments=arguments, - tool_result=tool_result, - user_id=user_id, - ) + if self._should_use_deterministic_response(tool_name): + return await finish( + self._fallback_format_tool_result(tool_name, tool_result), + queue_notice=queue_notice, + ) - if self._should_use_deterministic_response(tool_name): return await finish( - self._fallback_format_tool_result(tool_name, tool_result), + await self._render_tool_response_with_fallback( + user_message=routing_message, + user_id=user_id, + tool_name=tool_name, + tool_result=tool_result, + ), queue_notice=queue_notice, ) - return await finish( - await self._render_tool_response_with_fallback( - user_message=routing_message, - user_id=user_id, - tool_name=tool_name, - tool_result=tool_result, - ), - queue_notice=queue_notice, - ) - - text = (llm_result.get("response") or "").strip() - if self._is_low_value_response(text): - return await finish( - "Entendi. Pode me dar mais detalhes para eu consultar corretamente?", - queue_notice=queue_notice, - ) - return await finish(text, queue_notice=queue_notice) + text = (llm_result.get("response") or "").strip() + if self._is_low_value_response(text): + return await finish( + "Entendi. Pode me dar mais detalhes para eu consultar corretamente?", + queue_notice=queue_notice, + ) + return await finish(text, queue_notice=queue_notice) + except Exception as exc: + if not turn_history_persisted: + self._turn_trace["elapsed_ms"] = round((perf_counter() - turn_started_perf) * 1000, 2) + self._finalize_turn_history( + user_message=message, + assistant_response=None, + turn_status="failed", + error_detail=self._format_turn_error(exc), + ) + turn_history_persisted = True + raise async def _try_execute_orchestration_control_tool( self, @@ -1842,6 +1871,60 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin): conversation_context=conversation_context, ) + def _capture_turn_decision_trace(self, turn_decision: dict | None) -> None: + trace = getattr(self, "_turn_trace", None) + if not isinstance(trace, dict) or not isinstance(turn_decision, dict): + return + trace["intent"] = str(turn_decision.get("intent") or "").strip() or None + trace["domain"] = str(turn_decision.get("domain") or "").strip() or None + trace["action"] = str(turn_decision.get("action") or "").strip() or None + + def _capture_tool_invocation_trace(self, tool_name: str, arguments: dict | None) -> None: + trace = getattr(self, "_turn_trace", None) + if not isinstance(trace, dict): + return + trace["tool_name"] = str(tool_name or "").strip() or None + trace["tool_arguments"] = dict(arguments or {}) if isinstance(arguments, dict) else None + + def _finalize_turn_history( + self, + *, + user_message: str, + assistant_response: str | None, + turn_status: str, + error_detail: str | None = None, + ) -> None: + history_service = getattr(self, "history_service", None) + if history_service is None: + return + + trace = getattr(self, "_turn_trace", {}) or {} + history_service.record_turn( + request_id=str(trace.get("request_id") or ""), + conversation_id=str(trace.get("conversation_id") or "anonymous"), + user_id=trace.get("user_id"), + user_message=str(user_message or ""), + assistant_response=assistant_response, + turn_status=str(turn_status or "completed"), + intent=trace.get("intent"), + domain=trace.get("domain"), + action=trace.get("action"), + tool_name=trace.get("tool_name"), + tool_arguments=trace.get("tool_arguments"), + error_detail=error_detail, + started_at=trace.get("started_at"), + completed_at=utc_now(), + elapsed_ms=trace.get("elapsed_ms"), + ) + + def _format_turn_error(self, exc: Exception) -> str: + if isinstance(exc, HTTPException): + detail = exc.detail + if isinstance(detail, dict): + return json.dumps(detail, ensure_ascii=True, separators=(",", ":"), default=str) + return str(detail) + return f"{type(exc).__name__}: {exc}" + def _log_turn_event(self, event: str, **payload) -> None: trace = getattr(self, "_turn_trace", {}) or {} logger.info( @@ -1933,6 +2016,7 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin): arguments=arguments, user_id=user_id, ) + self._capture_tool_invocation_trace(tool_name=tool_name, arguments=arguments) started_at = perf_counter() try: result = await self.tool_executor.execute(tool_name, arguments, user_id=user_id) diff --git a/tests/test_conversation_history_service.py b/tests/test_conversation_history_service.py new file mode 100644 index 0000000..4d20d50 --- /dev/null +++ b/tests/test_conversation_history_service.py @@ -0,0 +1,226 @@ +import unittest +from datetime import datetime +from types import SimpleNamespace +from unittest.mock import patch + +from sqlalchemy import create_engine +from sqlalchemy.orm import sessionmaker + +from app.db.mock_database import MockBase +from app.db.mock_models import ConversationTurn, User +from app.services.orchestration.conversation_history_service import ConversationHistoryService + + +class _FakeQuery: + def __init__(self, result): + self.result = result + + def filter(self, *args, **kwargs): + return self + + def first(self): + return self.result + + +class _FakeSession: + def __init__(self, user=None): + self.user = user + self.added = [] + self.committed = False + self.rolled_back = False + self.closed = False + + def query(self, model): + return _FakeQuery(self.user) + + def add(self, item): + self.added.append(item) + + def commit(self): + self.committed = True + + def rollback(self): + self.rolled_back = True + + def close(self): + self.closed = True + + +class ConversationHistoryServiceTests(unittest.TestCase): + def test_record_turn_persists_conversation_audit_record(self): + session = _FakeSession( + user=SimpleNamespace( + id=7, + channel="telegram", + external_id="12345", + username="cliente_teste", + ) + ) + service = ConversationHistoryService() + started_at = datetime(2026, 3, 16, 18, 0, 0) + completed_at = datetime(2026, 3, 16, 18, 0, 5) + + with patch( + "app.services.orchestration.conversation_history_service.SessionMockLocal", + return_value=session, + ): + service.record_turn( + request_id="req-123", + conversation_id="user:7", + user_id=7, + user_message="quero comprar um carro", + assistant_response="Encontrei 2 veiculo(s).", + turn_status="completed", + intent="order_create", + domain="sales", + action="collect_order_create", + tool_name="consultar_estoque", + tool_arguments={"preco_max": 80000}, + error_detail=None, + started_at=started_at, + completed_at=completed_at, + elapsed_ms=512.4, + ) + + self.assertTrue(session.committed) + self.assertTrue(session.closed) + self.assertEqual(len(session.added), 1) + record = session.added[0] + self.assertEqual(record.request_id, "req-123") + self.assertEqual(record.conversation_id, "user:7") + self.assertEqual(record.user_id, 7) + self.assertEqual(record.channel, "telegram") + self.assertEqual(record.external_id, "12345") + self.assertEqual(record.username, "cliente_teste") + self.assertEqual(record.intent, "order_create") + self.assertEqual(record.domain, "sales") + self.assertEqual(record.action, "collect_order_create") + self.assertEqual(record.tool_name, "consultar_estoque") + self.assertEqual(record.tool_arguments, '{"preco_max":80000}') + self.assertEqual(record.started_at, started_at) + self.assertEqual(record.completed_at, completed_at) + self.assertEqual(record.elapsed_ms, 512.4) + + def test_list_turns_filters_and_orders_recent_first(self): + engine = create_engine("sqlite:///:memory:") + SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) + MockBase.metadata.create_all(bind=engine) + self.addCleanup(engine.dispose) + + db = SessionLocal() + user = User(channel="telegram", external_id="999", name="Cliente Teste", username="cliente_teste") + db.add(user) + db.commit() + db.refresh(user) + user_id = user.id + db.add_all( + [ + ConversationTurn( + request_id="req-old", + conversation_id=f"user:{user_id}", + user_id=user_id, + channel="telegram", + external_id="999", + username="cliente_teste", + user_message="oi", + assistant_response="ola", + turn_status="completed", + intent="general", + domain="general", + action="answer_user", + tool_name=None, + tool_arguments=None, + started_at=datetime(2026, 3, 16, 10, 0, 0), + completed_at=datetime(2026, 3, 16, 10, 0, 1), + elapsed_ms=100.0, + ), + ConversationTurn( + request_id="req-new", + conversation_id=f"user:{user_id}", + user_id=user_id, + channel="telegram", + external_id="999", + username="cliente_teste", + user_message="quero comprar", + assistant_response="pedido criado", + turn_status="completed", + intent="order_create", + domain="sales", + action="call_tool", + tool_name="realizar_pedido", + tool_arguments='{"vehicle_id":1}', + started_at=datetime(2026, 3, 16, 11, 0, 0), + completed_at=datetime(2026, 3, 16, 11, 0, 2), + elapsed_ms=230.0, + ), + ConversationTurn( + request_id="req-failed", + conversation_id="user:999", + user_id=None, + channel=None, + external_id=None, + username=None, + user_message="erro", + assistant_response=None, + turn_status="failed", + intent="general", + domain="general", + action="answer_user", + tool_name=None, + tool_arguments=None, + error_detail="RuntimeError: boom", + started_at=datetime(2026, 3, 16, 12, 0, 0), + completed_at=datetime(2026, 3, 16, 12, 0, 1), + elapsed_ms=300.0, + ), + ] + ) + db.commit() + db.close() + + service = ConversationHistoryService() + with patch( + "app.services.orchestration.conversation_history_service.SessionMockLocal", + SessionLocal, + ): + items = service.list_turns(user_id=user_id, turn_status="completed", limit=5) + + self.assertEqual(len(items), 2) + self.assertEqual(items[0]["request_id"], "req-new") + self.assertEqual(items[1]["request_id"], "req-old") + self.assertEqual(items[0]["tool_arguments"], {"vehicle_id": 1}) + self.assertEqual(items[0]["turn_status"], "completed") + self.assertEqual(items[0]["user_id"], user_id) + + def test_list_turns_can_filter_by_request_id(self): + engine = create_engine("sqlite:///:memory:") + SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) + MockBase.metadata.create_all(bind=engine) + self.addCleanup(engine.dispose) + + db = SessionLocal() + db.add( + ConversationTurn( + request_id="req-only", + conversation_id="user:42", + user_id=None, + user_message="teste", + assistant_response="ok", + turn_status="completed", + started_at=datetime(2026, 3, 16, 13, 0, 0), + completed_at=datetime(2026, 3, 16, 13, 0, 1), + ) + ) + db.commit() + db.close() + + service = ConversationHistoryService() + with patch( + "app.services.orchestration.conversation_history_service.SessionMockLocal", + SessionLocal, + ): + items = service.list_turns(request_id="req-only", limit=5) + + self.assertEqual(len(items), 1) + self.assertEqual(items[0]["request_id"], "req-only") + self.assertEqual(items[0]["conversation_id"], "user:42") diff --git a/tests/test_turn_decision_contract.py b/tests/test_turn_decision_contract.py index 1377c9b..b986415 100644 --- a/tests/test_turn_decision_contract.py +++ b/tests/test_turn_decision_contract.py @@ -1,5 +1,6 @@ import os import unittest +from types import SimpleNamespace os.environ.setdefault("DEBUG", "false") @@ -1111,6 +1112,196 @@ class TurnDecisionContractTests(unittest.IsolatedAsyncioTestCase): self.assertEqual(str(decision.get("action") or ""), "answer_user") self.assertEqual(str(decision.get("response_to_user") or "").strip(), "Resposta direta do contrato.") + async def test_handle_message_persists_completed_turn_history(self): + state = FakeState( + contexts={ + 1: { + "active_domain": "general", + "generic_memory": {}, + "shared_memory": {}, + "order_queue": [], + "pending_order_selection": None, + "pending_switch": None, + "last_stock_results": [], + "selected_vehicle": None, + } + } + ) + history_calls = [] + service = OrquestradorService.__new__(OrquestradorService) + service.state = state + service.normalizer = EntityNormalizer() + service.policy = ConversationPolicy(service=service) + service.history_service = SimpleNamespace(record_turn=lambda **kwargs: history_calls.append(kwargs)) + service._empty_extraction_payload = service.normalizer.empty_extraction_payload + service._log_turn_event = lambda *args, **kwargs: None + service._compose_order_aware_response = lambda response, user_id, queue_notice=None: response + + async def fake_maybe_auto_advance_next_order(base_response: str, user_id: int | None): + return base_response + + service._maybe_auto_advance_next_order = fake_maybe_auto_advance_next_order + service._upsert_user_context = lambda user_id: None + + async def fake_extract_turn_decision(message: str, user_id: int | None): + return { + "intent": "general", + "domain": "general", + "action": "answer_user", + "entities": service.normalizer.empty_extraction_payload(), + "missing_fields": [], + "selection_index": None, + "tool_name": None, + "tool_arguments": {}, + "response_to_user": "Resposta direta do contrato.", + } + + service._extract_turn_decision_with_llm = fake_extract_turn_decision + + async def fake_try_handle_immediate_context_reset(**kwargs): + return None + + service._try_handle_immediate_context_reset = fake_try_handle_immediate_context_reset + + async def fake_try_resolve_pending_order_selection(**kwargs): + return None + + service._try_resolve_pending_order_selection = fake_try_resolve_pending_order_selection + + async def fake_try_continue_queued_order(**kwargs): + return None + + service._try_continue_queued_order = fake_try_continue_queued_order + + async def fake_extract_message_plan(message: str, user_id: int | None): + return { + "orders": [ + { + "domain": "general", + "message": message, + "entities": service.normalizer.empty_extraction_payload(), + } + ] + } + + service._extract_message_plan_with_llm = fake_extract_message_plan + service._prepare_message_for_single_order = lambda message, user_id, routing_plan=None: (message, None, None) + service._resolve_entities_for_message_plan = lambda message_plan, routed_message: service.normalizer.empty_extraction_payload() + + async def fake_extract_entities(message: str, user_id: int | None): + return service.normalizer.empty_extraction_payload() + + service._extract_entities_with_llm = fake_extract_entities + + async def fake_extract_missing_sales_search_context_with_llm(**kwargs): + return {} + + service._extract_missing_sales_search_context_with_llm = fake_extract_missing_sales_search_context_with_llm + service._domain_from_intents = lambda intents: "general" + service._handle_context_switch = lambda **kwargs: None + service._update_active_domain = lambda **kwargs: None + + async def fake_try_execute_orchestration_control_tool(**kwargs): + return None + + service._try_execute_orchestration_control_tool = fake_try_execute_orchestration_control_tool + + async def fake_try_execute_business_tool_from_turn_decision(**kwargs): + return None + + service._try_execute_business_tool_from_turn_decision = fake_try_execute_business_tool_from_turn_decision + + async def fake_try_handle_review_management(**kwargs): + return None + + service._try_handle_review_management = fake_try_handle_review_management + + async def fake_try_confirm_pending_review(**kwargs): + return None + + service._try_confirm_pending_review = fake_try_confirm_pending_review + + async def fake_try_collect_and_schedule_review(**kwargs): + return None + + service._try_collect_and_schedule_review = fake_try_collect_and_schedule_review + + async def fake_try_collect_and_cancel_order(**kwargs): + return None + + service._try_collect_and_cancel_order = fake_try_collect_and_cancel_order + + async def fake_try_handle_order_listing(**kwargs): + return None + + service._try_handle_order_listing = fake_try_handle_order_listing + + async def fake_try_collect_and_create_order(**kwargs): + return None + + service._try_collect_and_create_order = fake_try_collect_and_create_order + + response = await service.handle_message( + "ola", + user_id=1, + ) + + self.assertEqual(response, "Resposta direta do contrato.") + self.assertEqual(len(history_calls), 1) + self.assertEqual(history_calls[0]["user_message"], "ola") + self.assertEqual(history_calls[0]["assistant_response"], "Resposta direta do contrato.") + self.assertEqual(history_calls[0]["turn_status"], "completed") + self.assertEqual(history_calls[0]["intent"], "general") + + async def test_handle_message_persists_failed_turn_history(self): + state = FakeState( + contexts={ + 1: { + "active_domain": "general", + "generic_memory": {}, + "shared_memory": {}, + "order_queue": [], + "pending_order_selection": None, + "pending_switch": None, + "last_stock_results": [], + "selected_vehicle": None, + } + } + ) + history_calls = [] + service = OrquestradorService.__new__(OrquestradorService) + service.state = state + service.normalizer = EntityNormalizer() + service.policy = ConversationPolicy(service=service) + service.history_service = SimpleNamespace(record_turn=lambda **kwargs: history_calls.append(kwargs)) + service._empty_extraction_payload = service.normalizer.empty_extraction_payload + service._log_turn_event = lambda *args, **kwargs: None + service._compose_order_aware_response = lambda response, user_id, queue_notice=None: response + + async def fake_maybe_auto_advance_next_order(base_response: str, user_id: int | None): + return base_response + + service._maybe_auto_advance_next_order = fake_maybe_auto_advance_next_order + service._upsert_user_context = lambda user_id: None + + async def fake_extract_turn_decision(message: str, user_id: int | None): + raise RuntimeError("falha controlada no turno") + + service._extract_turn_decision_with_llm = fake_extract_turn_decision + + with self.assertRaises(RuntimeError): + await service.handle_message( + "ola", + user_id=1, + ) + + self.assertEqual(len(history_calls), 1) + self.assertEqual(history_calls[0]["user_message"], "ola") + self.assertIsNone(history_calls[0]["assistant_response"]) + self.assertEqual(history_calls[0]["turn_status"], "failed") + self.assertIn("RuntimeError", history_calls[0]["error_detail"]) + self.assertIn("falha controlada no turno", history_calls[0]["error_detail"]) + async def test_handle_message_prioritizes_order_flow_over_model_answer_for_purchase_intent(self): state = FakeState( contexts={