From a412f9c674b1ed19238d5eaeb9b6eab1ca6e9596 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vitor=20Hugo=20Belorio=20Sim=C3=A3o?= Date: Fri, 6 Mar 2026 09:18:42 -0300 Subject: [PATCH] :broom: cleanup(services): reorganiza estrutura interna em subpastas por responsabilidade Redistribui os arquivos de app/services em modulos mais claros, separando orchestration, flows, ai, tools e user sem alterar a logica de negocio ou o comportamento funcional do sistema. Ajusta os imports afetados em rotas, startup da aplicacao e integracao com Telegram para refletir a nova organizacao interna e manter o fluxo atual intacto. Objetivos da limpeza: - reduzir a sensacao de pasta deposito em app/services - tornar o papel de cada modulo mais explicito - melhorar manutencao e navegacao do projeto - preparar o codigo para crescimento futuro com menor acoplamento estrutural --- app/api/routes/chat.py | 2 +- app/api/routes/mock.py | 2 +- .../telegram_satellite_service.py | 6 +- app/main.py | 2 +- app/services/{ => ai}/llm_service.py | 0 app/services/flows/order_flow.py | 211 +++++ app/services/flows/review_flow.py | 303 +++++++ .../orchestration/conversation_state_store.py | 63 ++ .../orchestrator_config.py | 0 .../orquestrador_service.py | 805 ++---------------- app/services/orchestration/prompt_builders.py | 57 ++ .../orchestration/response_formatter.py | 137 +++ app/services/{ => tools}/handlers.py | 0 app/services/{ => tools}/tool_registry.py | 2 +- app/services/{ => user}/user_service.py | 0 15 files changed, 835 insertions(+), 755 deletions(-) rename app/services/{ => ai}/llm_service.py (100%) create mode 100644 app/services/flows/order_flow.py create mode 100644 app/services/flows/review_flow.py create mode 100644 app/services/orchestration/conversation_state_store.py rename app/services/{ => orchestration}/orchestrator_config.py (100%) rename app/services/{ => orchestration}/orquestrador_service.py (60%) create mode 100644 app/services/orchestration/prompt_builders.py create mode 100644 app/services/orchestration/response_formatter.py rename app/services/{ => tools}/handlers.py (100%) rename app/services/{ => tools}/tool_registry.py (98%) rename app/services/{ => user}/user_service.py (100%) diff --git a/app/api/routes/chat.py b/app/api/routes/chat.py index bbb23db..2746a88 100644 --- a/app/api/routes/chat.py +++ b/app/api/routes/chat.py @@ -4,7 +4,7 @@ from sqlalchemy.orm import Session from app.api.routes.dependencies import db_error_detail, get_db from app.api.schemas import ChatRequest, ChatResponse -from app.services.orquestrador_service import OrquestradorService +from app.services.orchestration.orquestrador_service import OrquestradorService router = APIRouter(tags=["Chat"]) diff --git a/app/api/routes/mock.py b/app/api/routes/mock.py index 13774a1..786067f 100644 --- a/app/api/routes/mock.py +++ b/app/api/routes/mock.py @@ -15,7 +15,7 @@ from app.api.schemas import ( RealizarPedidoRequest, ValidarClienteVendaRequest, ) -from app.services.handlers import ( +from app.services.tools.handlers import ( agendar_revisao, avaliar_veiculo_troca, cancelar_agendamento_revisao, diff --git a/app/integrations/telegram_satellite_service.py b/app/integrations/telegram_satellite_service.py index 2688e03..12e79dd 100644 --- a/app/integrations/telegram_satellite_service.py +++ b/app/integrations/telegram_satellite_service.py @@ -10,9 +10,9 @@ from fastapi import HTTPException from app.core.settings import settings from app.db.database import SessionLocal from app.db.mock_database import SessionMockLocal -from app.services.llm_service import LLMService -from app.services.orquestrador_service import OrquestradorService -from app.services.user_service import UserService +from app.services.ai.llm_service import LLMService +from app.services.orchestration.orquestrador_service import OrquestradorService +from app.services.user.user_service import UserService logger = logging.getLogger(__name__) diff --git a/app/main.py b/app/main.py index 2fafe6f..2e27857 100644 --- a/app/main.py +++ b/app/main.py @@ -6,7 +6,7 @@ from app.db.database import Base, engine from app.db.mock_database import MockBase, mock_engine from app.db.models import Tool from app.db.mock_models import Customer, Order, ReviewSchedule, Vehicle -from app.services.llm_service import LLMService +from app.services.ai.llm_service import LLMService app = FastAPI(title="AI Orquestrador") diff --git a/app/services/llm_service.py b/app/services/ai/llm_service.py similarity index 100% rename from app/services/llm_service.py rename to app/services/ai/llm_service.py diff --git a/app/services/flows/order_flow.py b/app/services/flows/order_flow.py new file mode 100644 index 0000000..b259b4f --- /dev/null +++ b/app/services/flows/order_flow.py @@ -0,0 +1,211 @@ +import re +from datetime import datetime, timedelta + +from fastapi import HTTPException + +from app.services.orchestration.orchestrator_config import ( + CANCEL_ORDER_REQUIRED_FIELDS, + ORDER_REQUIRED_FIELDS, + PENDING_CANCEL_ORDER_DRAFT_TTL_MINUTES, + PENDING_ORDER_DRAFT_TTL_MINUTES, +) + + +class OrderFlowMixin: + def _is_valid_cpf(self, cpf: str) -> bool: + digits = re.sub(r"\D", "", cpf or "") + if len(digits) != 11: + return False + if digits == digits[0] * 11: + return False + + numbers = [int(d) for d in digits] + + sum_first = sum(n * w for n, w in zip(numbers[:9], range(10, 1, -1))) + first_digit = 11 - (sum_first % 11) + first_digit = 0 if first_digit >= 10 else first_digit + if first_digit != numbers[9]: + return False + + sum_second = sum(n * w for n, w in zip(numbers[:10], range(11, 1, -1))) + second_digit = 11 - (sum_second % 11) + second_digit = 0 if second_digit >= 10 else second_digit + return second_digit == numbers[10] + + def _try_prefill_order_value_from_memory(self, user_id: int | None, payload: dict) -> None: + if user_id is None or payload.get("valor_veiculo") is not None: + return + + context = self._get_user_context(user_id) + if not context: + return + memory = context.get("generic_memory", {}) + budget = memory.get("orcamento_max") + if isinstance(budget, (int, float)) and budget > 0: + payload["valor_veiculo"] = float(budget) + + def _render_missing_order_fields_prompt(self, missing_fields: list[str]) -> str: + labels = { + "cpf": "o CPF do cliente", + "valor_veiculo": "o valor do veiculo (R$)", + } + itens = [f"- {labels[field]}" for field in missing_fields] + return "Para realizar o pedido, preciso dos dados abaixo:\n" + "\n".join(itens) + + def _render_missing_cancel_order_fields_prompt(self, missing_fields: list[str]) -> str: + labels = { + "numero_pedido": "o numero do pedido (ex.: PED-20260305123456-ABC123)", + "motivo": "o motivo do cancelamento", + } + itens = [f"- {labels[field]}" for field in missing_fields] + return "Para cancelar o pedido, preciso dos dados abaixo:\n" + "\n".join(itens) + + async def _try_collect_and_create_order( + self, + message: str, + user_id: int | None, + extracted_fields: dict | None = None, + intents: dict | None = None, + ) -> str | None: + if user_id is None: + return None + + normalized_intents = self._normalize_intents(intents) + draft = self.state.get_entry("pending_order_drafts", user_id, expire=True) + + extracted = self._normalize_order_fields(extracted_fields) + has_intent = normalized_intents.get("order_create", False) + + if ( + draft + and not has_intent + and ( + normalized_intents.get("review_schedule", False) + or normalized_intents.get("review_list", False) + or normalized_intents.get("review_cancel", False) + or normalized_intents.get("review_reschedule", False) + or normalized_intents.get("order_cancel", False) + ) + and not extracted + ): + self.state.pop_entry("pending_order_drafts", user_id) + return None + + if not has_intent and draft is None: + return None + + if draft is None: + draft = { + "payload": {}, + "expires_at": datetime.utcnow() + timedelta(minutes=PENDING_ORDER_DRAFT_TTL_MINUTES), + } + + draft["payload"].update(extracted) + self._try_prefill_order_value_from_memory(user_id=user_id, payload=draft["payload"]) + + cpf_value = draft["payload"].get("cpf") + if cpf_value and not self._is_valid_cpf(str(cpf_value)): + draft["payload"].pop("cpf", None) + self.state.set_entry("pending_order_drafts", user_id, draft) + return "Para seguir com o pedido, preciso de um CPF valido com 11 digitos." + + valor = draft["payload"].get("valor_veiculo") + if valor is not None: + try: + parsed = float(valor) + if parsed <= 0: + draft["payload"].pop("valor_veiculo", None) + else: + draft["payload"]["valor_veiculo"] = round(parsed, 2) + except (TypeError, ValueError): + draft["payload"].pop("valor_veiculo", None) + + draft["expires_at"] = datetime.utcnow() + timedelta(minutes=PENDING_ORDER_DRAFT_TTL_MINUTES) + self.state.set_entry("pending_order_drafts", user_id, draft) + + missing = [field for field in ORDER_REQUIRED_FIELDS if field not in draft["payload"]] + if missing: + return self._render_missing_order_fields_prompt(missing) + + try: + tool_result = await self.registry.execute( + "realizar_pedido", + draft["payload"], + user_id=user_id, + ) + except HTTPException as exc: + return self._http_exception_detail(exc) + finally: + self.state.pop_entry("pending_order_drafts", user_id) + + return self._fallback_format_tool_result("realizar_pedido", tool_result) + + async def _try_collect_and_cancel_order( + self, + message: str, + user_id: int | None, + extracted_fields: dict | None = None, + intents: dict | None = None, + ) -> str | None: + if user_id is None: + return None + + normalized_intents = self._normalize_intents(intents) + draft = self.state.get_entry("pending_cancel_order_drafts", user_id, expire=True) + + extracted = self._normalize_cancel_order_fields(extracted_fields) + has_intent = normalized_intents.get("order_cancel", False) + + if ( + draft + and not has_intent + and ( + normalized_intents.get("review_schedule", False) + or normalized_intents.get("review_list", False) + or normalized_intents.get("review_cancel", False) + or normalized_intents.get("review_reschedule", False) + or normalized_intents.get("order_create", False) + ) + and not extracted + ): + self.state.pop_entry("pending_cancel_order_drafts", user_id) + return None + + if not has_intent and draft is None: + return None + + if draft is None: + draft = { + "payload": {}, + "expires_at": datetime.utcnow() + timedelta(minutes=PENDING_CANCEL_ORDER_DRAFT_TTL_MINUTES), + } + + if ( + "motivo" not in extracted + and draft["payload"].get("numero_pedido") + and not has_intent + ): + free_text = (message or "").strip() + if free_text and len(free_text) >= 4: + extracted["motivo"] = free_text + + draft["payload"].update(extracted) + draft["expires_at"] = datetime.utcnow() + timedelta(minutes=PENDING_CANCEL_ORDER_DRAFT_TTL_MINUTES) + self.state.set_entry("pending_cancel_order_drafts", user_id, draft) + + missing = [field for field in CANCEL_ORDER_REQUIRED_FIELDS if field not in draft["payload"]] + if missing: + return self._render_missing_cancel_order_fields_prompt(missing) + + try: + tool_result = await self.registry.execute( + "cancelar_pedido", + draft["payload"], + user_id=user_id, + ) + except HTTPException as exc: + return self._http_exception_detail(exc) + finally: + self.state.pop_entry("pending_cancel_order_drafts", user_id) + + return self._fallback_format_tool_result("cancelar_pedido", tool_result) diff --git a/app/services/flows/review_flow.py b/app/services/flows/review_flow.py new file mode 100644 index 0000000..a042205 --- /dev/null +++ b/app/services/flows/review_flow.py @@ -0,0 +1,303 @@ +import re +from datetime import datetime, timedelta + +from fastapi import HTTPException + +from app.services.orchestration.orchestrator_config import ( + LAST_REVIEW_PACKAGE_TTL_MINUTES, + PENDING_REVIEW_DRAFT_TTL_MINUTES, + REVIEW_REQUIRED_FIELDS, +) + + +class ReviewFlowMixin: + async def _try_handle_review_management( + self, + message: str, + user_id: int | None, + extracted_fields: dict | None = None, + intents: dict | None = None, + ) -> str | None: + if user_id is None: + return None + normalized_intents = self._normalize_intents(intents) + draft = self.state.get_entry("pending_review_management_drafts", user_id, expire=True) + + has_list_intent = normalized_intents.get("review_list", False) + has_cancel_intent = normalized_intents.get("review_cancel", False) + has_reschedule_intent = normalized_intents.get("review_reschedule", False) + + if has_list_intent: + self._reset_pending_review_states(user_id=user_id) + try: + tool_result = await self.registry.execute( + "listar_agendamentos_revisao", + {"limite": 20}, + user_id=user_id, + ) + except HTTPException as exc: + return self._http_exception_detail(exc) + return self._fallback_format_tool_result("listar_agendamentos_revisao", tool_result) + + if not has_cancel_intent and not has_reschedule_intent and draft is None: + return None + + if draft is None: + action = "reschedule" if has_reschedule_intent else "cancel" + draft = { + "action": action, + "payload": {}, + "expires_at": datetime.utcnow() + timedelta(minutes=PENDING_REVIEW_DRAFT_TTL_MINUTES), + } + else: + if has_reschedule_intent: + draft["action"] = "reschedule" + elif has_cancel_intent: + draft["action"] = "cancel" + + extracted = self._normalize_review_management_fields(extracted_fields) + if "protocolo" not in extracted: + inferred_protocol = self._extract_review_protocol_from_text(message) + if inferred_protocol: + extracted["protocolo"] = inferred_protocol + + action = draft.get("action", "cancel") + if ( + action == "cancel" + and "motivo" not in extracted + and draft["payload"].get("protocolo") + and not has_cancel_intent + ): + free_text = str(message or "").strip() + if free_text and len(free_text) >= 4 and not self._is_affirmative_message(free_text): + extracted["motivo"] = free_text + + draft["payload"].update(extracted) + draft["expires_at"] = datetime.utcnow() + timedelta(minutes=PENDING_REVIEW_DRAFT_TTL_MINUTES) + self.state.set_entry("pending_review_management_drafts", user_id, draft) + + if action == "reschedule": + missing = [field for field in ("protocolo", "nova_data_hora") if field not in draft["payload"]] + if missing: + return self._render_missing_review_reschedule_fields_prompt(missing) + try: + tool_result = await self.registry.execute( + "editar_data_revisao", + { + "protocolo": draft["payload"]["protocolo"], + "nova_data_hora": draft["payload"]["nova_data_hora"], + }, + user_id=user_id, + ) + except HTTPException as exc: + return self._http_exception_detail(exc) + finally: + self.state.pop_entry("pending_review_management_drafts", user_id) + return self._fallback_format_tool_result("editar_data_revisao", tool_result) + + missing = [field for field in ("protocolo",) if field not in draft["payload"]] + if missing: + return self._render_missing_review_cancel_fields_prompt(missing) + try: + tool_result = await self.registry.execute( + "cancelar_agendamento_revisao", + { + "protocolo": draft["payload"]["protocolo"], + "motivo": draft["payload"].get("motivo"), + }, + user_id=user_id, + ) + except HTTPException as exc: + return self._http_exception_detail(exc) + finally: + self.state.pop_entry("pending_review_management_drafts", user_id) + return self._fallback_format_tool_result("cancelar_agendamento_revisao", tool_result) + + def _render_missing_review_fields_prompt(self, missing_fields: list[str]) -> str: + labels = { + "placa": "a placa do veiculo", + "data_hora": "a data e hora desejada para a revisao", + "modelo": "o modelo do veiculo", + "ano": "o ano do veiculo", + "km": "a quilometragem atual (km)", + "revisao_previa_concessionaria": "se ja fez revisao na concessionaria (sim/nao)", + } + itens = [f"- {labels[field]}" for field in missing_fields] + return "Para agendar sua revisao, preciso dos dados abaixo:\n" + "\n".join(itens) + + def _render_missing_review_cancel_fields_prompt(self, missing_fields: list[str]) -> str: + labels = { + "protocolo": "o protocolo da revisao (ex.: REV-20260310-ABC12345)", + } + itens = [f"- {labels[field]}" for field in missing_fields] + return "Para cancelar o agendamento de revisao, preciso dos dados abaixo:\n" + "\n".join(itens) + + def _render_missing_review_reschedule_fields_prompt(self, missing_fields: list[str]) -> str: + labels = { + "protocolo": "o protocolo da revisao (ex.: REV-20260310-ABC12345)", + "nova_data_hora": "a nova data e hora desejada para a revisao", + } + itens = [f"- {labels[field]}" for field in missing_fields] + return "Para remarcar sua revisao, preciso dos dados abaixo:\n" + "\n".join(itens) + + def _render_review_reuse_question(self) -> str: + return ( + "Deseja usar os mesmos dados do ultimo veiculo e informar so a data/hora da revisao? " + "(sim/nao)" + ) + + def _store_last_review_package(self, user_id: int | None, payload: dict | None) -> None: + if user_id is None or not isinstance(payload, dict): + return + package = { + "placa": payload.get("placa"), + "modelo": payload.get("modelo"), + "ano": payload.get("ano"), + "km": payload.get("km"), + "revisao_previa_concessionaria": payload.get("revisao_previa_concessionaria"), + } + sanitized = {k: v for k, v in package.items() if v is not None} + required = {"placa", "modelo", "ano", "km", "revisao_previa_concessionaria"} + if not required.issubset(sanitized.keys()): + return + self.state.set_entry( + "last_review_packages", + user_id, + { + "payload": sanitized, + "expires_at": datetime.utcnow() + timedelta(minutes=LAST_REVIEW_PACKAGE_TTL_MINUTES), + }, + ) + + def _get_last_review_package(self, user_id: int | None) -> dict | None: + if user_id is None: + return None + cached = self.state.get_entry("last_review_packages", user_id, expire=True) + if not cached: + return None + payload = cached.get("payload") + return dict(payload) if isinstance(payload, dict) else None + + async def _try_collect_and_schedule_review( + self, + message: str, + user_id: int | None, + extracted_fields: dict | None = None, + intents: dict | None = None, + ) -> str | None: + if user_id is None: + return None + + normalized_intents = self._normalize_intents(intents) + has_intent = normalized_intents.get("review_schedule", False) + has_management_intent = ( + normalized_intents.get("review_list", False) + or normalized_intents.get("review_cancel", False) + or normalized_intents.get("review_reschedule", False) + ) + + if has_management_intent: + self.state.pop_entry("pending_review_drafts", user_id) + self.state.pop_entry("pending_review_reuse_confirmations", user_id) + return None + + draft = self.state.get_entry("pending_review_drafts", user_id, expire=True) + extracted = self._normalize_review_fields(extracted_fields) + pending_reuse = self.state.get_entry("pending_review_reuse_confirmations", user_id, expire=True) + + if pending_reuse: + should_reuse = False + if self._is_negative_message(message): + self.state.pop_entry("pending_review_reuse_confirmations", user_id) + pending_reuse = None + elif self._is_affirmative_message(message) or "data_hora" in extracted: + should_reuse = True + else: + return self._render_review_reuse_question() + + if should_reuse: + seed_payload = dict(pending_reuse.get("payload") or {}) + if draft is None: + draft = { + "payload": seed_payload, + "expires_at": datetime.utcnow() + timedelta(minutes=PENDING_REVIEW_DRAFT_TTL_MINUTES), + } + else: + for key, value in seed_payload.items(): + draft["payload"].setdefault(key, value) + self.state.pop_entry("pending_review_reuse_confirmations", user_id) + if "data_hora" not in extracted: + self.state.set_entry("pending_review_drafts", user_id, draft) + return "Perfeito. Me informe apenas a data e hora desejada para a revisao." + + if has_intent and draft is None and not extracted: + last_package = self._get_last_review_package(user_id=user_id) + if last_package: + self.state.set_entry( + "pending_review_reuse_confirmations", + user_id, + { + "payload": last_package, + "expires_at": datetime.utcnow() + timedelta(minutes=PENDING_REVIEW_DRAFT_TTL_MINUTES), + }, + ) + return self._render_review_reuse_question() + + if ( + draft + and not has_intent + and ( + normalized_intents.get("order_create", False) + or normalized_intents.get("order_cancel", False) + ) + and not extracted + ): + self.state.pop_entry("pending_review_drafts", user_id) + return None + + if not has_intent and draft is None: + return None + + if draft is None: + draft = { + "payload": {}, + "expires_at": datetime.utcnow() + timedelta(minutes=PENDING_REVIEW_DRAFT_TTL_MINUTES), + } + + draft["payload"].update(extracted) + self._try_prefill_review_fields_from_memory(user_id=user_id, payload=draft["payload"]) + if ( + "revisao_previa_concessionaria" not in draft["payload"] + and draft["payload"] + and not extracted + ): + if self._is_affirmative_message(message): + draft["payload"]["revisao_previa_concessionaria"] = True + elif self._is_negative_message(message): + draft["payload"]["revisao_previa_concessionaria"] = False + draft["expires_at"] = datetime.utcnow() + timedelta(minutes=PENDING_REVIEW_DRAFT_TTL_MINUTES) + self.state.set_entry("pending_review_drafts", user_id, draft) + + missing = [field for field in REVIEW_REQUIRED_FIELDS if field not in draft["payload"]] + if missing: + return self._render_missing_review_fields_prompt(missing) + + try: + tool_result = await self.registry.execute( + "agendar_revisao", + draft["payload"], + user_id=user_id, + ) + except HTTPException as exc: + self._capture_review_confirmation_suggestion( + tool_name="agendar_revisao", + arguments=draft["payload"], + exc=exc, + user_id=user_id, + ) + return self._http_exception_detail(exc) + finally: + self.state.pop_entry("pending_review_drafts", user_id) + + self._store_last_review_package(user_id=user_id, payload=draft["payload"]) + return self._fallback_format_tool_result("agendar_revisao", tool_result) diff --git a/app/services/orchestration/conversation_state_store.py b/app/services/orchestration/conversation_state_store.py new file mode 100644 index 0000000..bd96602 --- /dev/null +++ b/app/services/orchestration/conversation_state_store.py @@ -0,0 +1,63 @@ +from datetime import datetime, timedelta + + +class ConversationStateStore: + def __init__(self) -> None: + self.user_contexts: dict[int, dict] = {} + self.pending_review_confirmations: dict[int, dict] = {} + self.pending_review_drafts: dict[int, dict] = {} + self.pending_review_management_drafts: dict[int, dict] = {} + self.last_review_packages: dict[int, dict] = {} + self.pending_review_reuse_confirmations: dict[int, dict] = {} + self.pending_order_drafts: dict[int, dict] = {} + self.pending_cancel_order_drafts: dict[int, dict] = {} + + def upsert_user_context(self, user_id: int | None, ttl_minutes: int) -> None: + if user_id is None: + return + now = datetime.utcnow() + context = self.user_contexts.get(user_id) + if context and context["expires_at"] >= now: + context["expires_at"] = now + timedelta(minutes=ttl_minutes) + return + self.user_contexts[user_id] = { + "active_domain": "general", + "generic_memory": {}, + "shared_memory": {}, + "order_queue": [], + "pending_switch": None, + "expires_at": now + timedelta(minutes=ttl_minutes), + } + + def get_user_context(self, user_id: int | None) -> dict | None: + if user_id is None: + return None + context = self.user_contexts.get(user_id) + if not context: + return None + if context["expires_at"] < datetime.utcnow(): + self.user_contexts.pop(user_id, None) + return None + return context + + def get_entry(self, bucket: str, user_id: int | None, *, expire: bool = False) -> dict | None: + if user_id is None: + return None + entries = getattr(self, bucket) + entry = entries.get(user_id) + if not entry: + return None + if expire and entry.get("expires_at") and entry["expires_at"] < datetime.utcnow(): + entries.pop(user_id, None) + return None + return entry + + def set_entry(self, bucket: str, user_id: int | None, value: dict) -> None: + if user_id is None: + return + getattr(self, bucket)[user_id] = value + + def pop_entry(self, bucket: str, user_id: int | None) -> dict | None: + if user_id is None: + return None + return getattr(self, bucket).pop(user_id, None) diff --git a/app/services/orchestrator_config.py b/app/services/orchestration/orchestrator_config.py similarity index 100% rename from app/services/orchestrator_config.py rename to app/services/orchestration/orchestrator_config.py diff --git a/app/services/orquestrador_service.py b/app/services/orchestration/orquestrador_service.py similarity index 60% rename from app/services/orquestrador_service.py rename to app/services/orchestration/orquestrador_service.py index 1d98d0a..172b294 100644 --- a/app/services/orquestrador_service.py +++ b/app/services/orchestration/orquestrador_service.py @@ -7,37 +7,29 @@ from datetime import datetime, timedelta from fastapi import HTTPException from sqlalchemy.orm import Session -from app.services.orchestrator_config import ( - CANCEL_ORDER_REQUIRED_FIELDS, +from app.services.orchestration.orchestrator_config import ( DETERMINISTIC_RESPONSE_TOOLS, LOW_VALUE_RESPONSES, - LAST_REVIEW_PACKAGE_TTL_MINUTES, - ORDER_REQUIRED_FIELDS, - PENDING_CANCEL_ORDER_DRAFT_TTL_MINUTES, - PENDING_ORDER_DRAFT_TTL_MINUTES, - PENDING_REVIEW_DRAFT_TTL_MINUTES, PENDING_REVIEW_TTL_MINUTES, - REVIEW_REQUIRED_FIELDS, USER_CONTEXT_TTL_MINUTES, ) -from app.services.llm_service import LLMService -from app.services.tool_registry import ToolRegistry +from app.services.orchestration.conversation_state_store import ConversationStateStore +from app.services.ai.llm_service import LLMService +from app.services.flows.order_flow import OrderFlowMixin +from app.services.orchestration.prompt_builders import ( + build_force_tool_prompt, + build_result_prompt, + build_router_prompt, +) +from app.services.flows.review_flow import ReviewFlowMixin +from app.services.orchestration.response_formatter import fallback_format_tool_result +from app.services.tools.tool_registry import ToolRegistry logger = logging.getLogger(__name__) -class OrquestradorService: - USER_CONTEXTS: dict[int, dict] = {} - - # Memoria temporaria de confirmacao quando a API sugere novo horario (conflito 409). - PENDING_REVIEW_CONFIRMATIONS: dict[int, dict] = {} - # Rascunho por usuario para juntar dados de revisao enviados em mensagens separadas. - PENDING_REVIEW_DRAFTS: dict[int, dict] = {} - PENDING_REVIEW_MANAGEMENT_DRAFTS: dict[int, dict] = {} - LAST_REVIEW_PACKAGES: dict[int, dict] = {} - PENDING_REVIEW_REUSE_CONFIRMATIONS: dict[int, dict] = {} - PENDING_ORDER_DRAFTS: dict[int, dict] = {} - PENDING_CANCEL_ORDER_DRAFTS: dict[int, dict] = {} +class OrquestradorService(ReviewFlowMixin, OrderFlowMixin): + state = ConversationStateStore() def __init__(self, db: Session): """Inicializa servicos de LLM e registro de tools para a sessao atual.""" @@ -236,45 +228,23 @@ class OrquestradorService: def _reset_pending_review_states(self, user_id: int | None) -> None: if user_id is None: return - self.PENDING_REVIEW_DRAFTS.pop(user_id, None) - self.PENDING_REVIEW_CONFIRMATIONS.pop(user_id, None) - self.PENDING_REVIEW_MANAGEMENT_DRAFTS.pop(user_id, None) - self.PENDING_REVIEW_REUSE_CONFIRMATIONS.pop(user_id, None) + self.state.pop_entry("pending_review_drafts", user_id) + self.state.pop_entry("pending_review_confirmations", user_id) + self.state.pop_entry("pending_review_management_drafts", user_id) + self.state.pop_entry("pending_review_reuse_confirmations", user_id) def _reset_pending_order_states(self, user_id: int | None) -> None: if user_id is None: return - self.PENDING_ORDER_DRAFTS.pop(user_id, None) - self.PENDING_CANCEL_ORDER_DRAFTS.pop(user_id, None) + self.state.pop_entry("pending_order_drafts", user_id) + self.state.pop_entry("pending_cancel_order_drafts", user_id) # Nessa função é onde eu configuro a memória volátil do sistema def _upsert_user_context(self, user_id: int | None) -> None: - if user_id is None: - return - now = datetime.utcnow() - context = self.USER_CONTEXTS.get(user_id) - if context and context["expires_at"] >= now: - context["expires_at"] = now + timedelta(minutes=USER_CONTEXT_TTL_MINUTES) - return - self.USER_CONTEXTS[user_id] = { - "active_domain": "general", - "generic_memory": {}, - "shared_memory": {}, - "order_queue": [], - "pending_switch": None, - "expires_at": now + timedelta(minutes=USER_CONTEXT_TTL_MINUTES), - } + self.state.upsert_user_context(user_id=user_id, ttl_minutes=USER_CONTEXT_TTL_MINUTES) def _get_user_context(self, user_id: int | None) -> dict | None: - if user_id is None: - return None - context = self.USER_CONTEXTS.get(user_id) - if not context: - return None - if context["expires_at"] < datetime.utcnow(): - self.USER_CONTEXTS.pop(user_id, None) - return None - return context + return self.state.get_user_context(user_id) def _extract_generic_memory_fields(self, llm_generic_fields: dict | None = None) -> dict: extracted: dict = {} @@ -907,13 +877,13 @@ class OrquestradorService: def _render_open_flow_prompt(self, user_id: int | None, domain: str) -> str: if domain == "review" and user_id is not None: - draft = self.PENDING_REVIEW_DRAFTS.get(user_id) + draft = self.state.get_entry("pending_review_drafts", user_id, expire=True) if draft: missing = [field for field in REVIEW_REQUIRED_FIELDS if field not in draft.get("payload", {})] if missing: return self._render_missing_review_fields_prompt(missing) - management_draft = self.PENDING_REVIEW_MANAGEMENT_DRAFTS.get(user_id) + management_draft = self.state.get_entry("pending_review_management_drafts", user_id, expire=True) if management_draft: action = management_draft.get("action", "cancel") payload = management_draft.get("payload", {}) @@ -926,19 +896,19 @@ class OrquestradorService: if missing: return self._render_missing_review_cancel_fields_prompt(missing) - pending = self.PENDING_REVIEW_CONFIRMATIONS.get(user_id) + pending = self.state.get_entry("pending_review_confirmations", user_id, expire=True) if pending: return "Antes de mudar de assunto, me confirme se podemos concluir seu agendamento de revisao." - reuse_pending = self.PENDING_REVIEW_REUSE_CONFIRMATIONS.get(user_id) + reuse_pending = self.state.get_entry("pending_review_reuse_confirmations", user_id, expire=True) if reuse_pending: return self._render_review_reuse_question() if domain == "sales" and user_id is not None: - draft = self.PENDING_ORDER_DRAFTS.get(user_id) + draft = self.state.get_entry("pending_order_drafts", user_id, expire=True) if draft: missing = [field for field in ORDER_REQUIRED_FIELDS if field not in draft.get("payload", {})] if missing: return self._render_missing_order_fields_prompt(missing) - cancel_draft = self.PENDING_CANCEL_ORDER_DRAFTS.get(user_id) + cancel_draft = self.state.get_entry("pending_cancel_order_drafts", user_id, expire=True) if cancel_draft: missing = [field for field in CANCEL_ORDER_REQUIRED_FIELDS if field not in cancel_draft.get("payload", {})] if missing: @@ -1043,15 +1013,15 @@ class OrquestradorService: return False if domain == "review": return bool( - self.PENDING_REVIEW_DRAFTS.get(user_id) - or self.PENDING_REVIEW_CONFIRMATIONS.get(user_id) - or self.PENDING_REVIEW_MANAGEMENT_DRAFTS.get(user_id) - or self.PENDING_REVIEW_REUSE_CONFIRMATIONS.get(user_id) + self.state.get_entry("pending_review_drafts", user_id, expire=True) + or self.state.get_entry("pending_review_confirmations", user_id, expire=True) + or self.state.get_entry("pending_review_management_drafts", user_id, expire=True) + or self.state.get_entry("pending_review_reuse_confirmations", user_id, expire=True) ) if domain == "sales": return bool( - self.PENDING_ORDER_DRAFTS.get(user_id) - or self.PENDING_CANCEL_ORDER_DRAFTS.get(user_id) + self.state.get_entry("pending_order_drafts", user_id, expire=True) + or self.state.get_entry("pending_cancel_order_drafts", user_id, expire=True) ) return False @@ -1158,522 +1128,6 @@ class OrquestradorService: def _is_low_value_response(self, text: str) -> bool: return text.strip().lower() in LOW_VALUE_RESPONSES - async def _try_handle_review_management( - self, - message: str, - user_id: int | None, - extracted_fields: dict | None = None, - intents: dict | None = None, - ) -> str | None: - if user_id is None: - return None - normalized_intents = self._normalize_intents(intents) - draft = self.PENDING_REVIEW_MANAGEMENT_DRAFTS.get(user_id) - if draft and draft["expires_at"] < datetime.utcnow(): - self.PENDING_REVIEW_MANAGEMENT_DRAFTS.pop(user_id, None) - draft = None - - has_list_intent = normalized_intents.get("review_list", False) - has_cancel_intent = normalized_intents.get("review_cancel", False) - has_reschedule_intent = normalized_intents.get("review_reschedule", False) - - if has_list_intent: - # Listagem e acao terminal; limpa rascunhos de revisao para evitar conflito de contexto. - self._reset_pending_review_states(user_id=user_id) - try: - tool_result = await self.registry.execute( - "listar_agendamentos_revisao", - {"limite": 20}, - user_id=user_id, - ) - except HTTPException as exc: - return self._http_exception_detail(exc) - return self._fallback_format_tool_result("listar_agendamentos_revisao", tool_result) - - if not has_cancel_intent and not has_reschedule_intent and draft is None: - return None - - if draft is None: - action = "reschedule" if has_reschedule_intent else "cancel" - draft = { - "action": action, - "payload": {}, - "expires_at": datetime.utcnow() + timedelta(minutes=PENDING_REVIEW_DRAFT_TTL_MINUTES), - } - else: - if has_reschedule_intent: - draft["action"] = "reschedule" - elif has_cancel_intent: - draft["action"] = "cancel" - - extracted = self._normalize_review_management_fields(extracted_fields) - if "protocolo" not in extracted: - inferred_protocol = self._extract_review_protocol_from_text(message) - if inferred_protocol: - extracted["protocolo"] = inferred_protocol - - action = draft.get("action", "cancel") - if ( - action == "cancel" - and "motivo" not in extracted - and draft["payload"].get("protocolo") - and not has_cancel_intent - ): - free_text = str(message or "").strip() - if free_text and len(free_text) >= 4 and not self._is_affirmative_message(free_text): - extracted["motivo"] = free_text - - draft["payload"].update(extracted) - draft["expires_at"] = datetime.utcnow() + timedelta(minutes=PENDING_REVIEW_DRAFT_TTL_MINUTES) - self.PENDING_REVIEW_MANAGEMENT_DRAFTS[user_id] = draft - - if action == "reschedule": - missing = [field for field in ("protocolo", "nova_data_hora") if field not in draft["payload"]] - if missing: - return self._render_missing_review_reschedule_fields_prompt(missing) - try: - tool_result = await self.registry.execute( - "editar_data_revisao", - { - "protocolo": draft["payload"]["protocolo"], - "nova_data_hora": draft["payload"]["nova_data_hora"], - }, - user_id=user_id, - ) - except HTTPException as exc: - return self._http_exception_detail(exc) - finally: - self.PENDING_REVIEW_MANAGEMENT_DRAFTS.pop(user_id, None) - return self._fallback_format_tool_result("editar_data_revisao", tool_result) - - missing = [field for field in ("protocolo",) if field not in draft["payload"]] - if missing: - return self._render_missing_review_cancel_fields_prompt(missing) - try: - tool_result = await self.registry.execute( - "cancelar_agendamento_revisao", - { - "protocolo": draft["payload"]["protocolo"], - "motivo": draft["payload"].get("motivo"), - }, - user_id=user_id, - ) - except HTTPException as exc: - return self._http_exception_detail(exc) - finally: - self.PENDING_REVIEW_MANAGEMENT_DRAFTS.pop(user_id, None) - return self._fallback_format_tool_result("cancelar_agendamento_revisao", tool_result) - - def _render_missing_review_fields_prompt(self, missing_fields: list[str]) -> str: - labels = { - "placa": "a placa do veiculo", - "data_hora": "a data e hora desejada para a revisao", - "modelo": "o modelo do veiculo", - "ano": "o ano do veiculo", - "km": "a quilometragem atual (km)", - "revisao_previa_concessionaria": "se ja fez revisao na concessionaria (sim/nao)", - } - itens = [f"- {labels[field]}" for field in missing_fields] - return "Para agendar sua revisao, preciso dos dados abaixo:\n" + "\n".join(itens) - - def _render_missing_review_cancel_fields_prompt(self, missing_fields: list[str]) -> str: - labels = { - "protocolo": "o protocolo da revisao (ex.: REV-20260310-ABC12345)", - } - itens = [f"- {labels[field]}" for field in missing_fields] - return "Para cancelar o agendamento de revisao, preciso dos dados abaixo:\n" + "\n".join(itens) - - def _render_missing_review_reschedule_fields_prompt(self, missing_fields: list[str]) -> str: - labels = { - "protocolo": "o protocolo da revisao (ex.: REV-20260310-ABC12345)", - "nova_data_hora": "a nova data e hora desejada para a revisao", - } - itens = [f"- {labels[field]}" for field in missing_fields] - return "Para remarcar sua revisao, preciso dos dados abaixo:\n" + "\n".join(itens) - - def _render_review_reuse_question(self) -> str: - return ( - "Deseja usar os mesmos dados do ultimo veiculo e informar so a data/hora da revisao? " - "(sim/nao)" - ) - - def _store_last_review_package(self, user_id: int | None, payload: dict | None) -> None: - if user_id is None or not isinstance(payload, dict): - return - package = { - "placa": payload.get("placa"), - "modelo": payload.get("modelo"), - "ano": payload.get("ano"), - "km": payload.get("km"), - "revisao_previa_concessionaria": payload.get("revisao_previa_concessionaria"), - } - sanitized = {k: v for k, v in package.items() if v is not None} - required = {"placa", "modelo", "ano", "km", "revisao_previa_concessionaria"} - if not required.issubset(sanitized.keys()): - return - self.LAST_REVIEW_PACKAGES[user_id] = { - "payload": sanitized, - "expires_at": datetime.utcnow() + timedelta(minutes=LAST_REVIEW_PACKAGE_TTL_MINUTES), - } - - def _get_last_review_package(self, user_id: int | None) -> dict | None: - if user_id is None: - return None - cached = self.LAST_REVIEW_PACKAGES.get(user_id) - if not cached: - return None - if cached["expires_at"] < datetime.utcnow(): - self.LAST_REVIEW_PACKAGES.pop(user_id, None) - return None - payload = cached.get("payload") - return dict(payload) if isinstance(payload, dict) else None - - def _is_valid_cpf(self, cpf: str) -> bool: - digits = re.sub(r"\D", "", cpf or "") - if len(digits) != 11: - return False - if digits == digits[0] * 11: - return False - - numbers = [int(d) for d in digits] - - sum_first = sum(n * w for n, w in zip(numbers[:9], range(10, 1, -1))) - first_digit = 11 - (sum_first % 11) - first_digit = 0 if first_digit >= 10 else first_digit - if first_digit != numbers[9]: - return False - - sum_second = sum(n * w for n, w in zip(numbers[:10], range(11, 1, -1))) - second_digit = 11 - (sum_second % 11) - second_digit = 0 if second_digit >= 10 else second_digit - return second_digit == numbers[10] - - def _try_prefill_order_value_from_memory(self, user_id: int | None, payload: dict) -> None: - # So preenche quando o usuario ainda nao informou valor explicitamente no fluxo atual. - if user_id is None or payload.get("valor_veiculo") is not None: - return - - context = self._get_user_context(user_id) - if not context: - return - memory = context.get("generic_memory", {}) - budget = memory.get("orcamento_max") - if isinstance(budget, (int, float)) and budget > 0: - # Reaproveita o orcamento capturado anteriormente como valor base do pedido. - payload["valor_veiculo"] = float(budget) - - def _render_missing_order_fields_prompt(self, missing_fields: list[str]) -> str: - labels = { - "cpf": "o CPF do cliente", - "valor_veiculo": "o valor do veiculo (R$)", - } - itens = [f"- {labels[field]}" for field in missing_fields] - return "Para realizar o pedido, preciso dos dados abaixo:\n" + "\n".join(itens) - - def _render_missing_cancel_order_fields_prompt(self, missing_fields: list[str]) -> str: - labels = { - "numero_pedido": "o numero do pedido (ex.: PED-20260305123456-ABC123)", - "motivo": "o motivo do cancelamento", - } - itens = [f"- {labels[field]}" for field in missing_fields] - return "Para cancelar o pedido, preciso dos dados abaixo:\n" + "\n".join(itens) - - # Em vez de tentar entender tudo de uma vez, o bot mantem um "estado" do que ja sabe e vai perguntando apenas o que falta (os "slots" vazios) ate que a tarefa possa ser completada. - async def _try_collect_and_schedule_review( - self, - message: str, - user_id: int | None, - extracted_fields: dict | None = None, - intents: dict | None = None, - ) -> str | None: - if user_id is None: - return None - - normalized_intents = self._normalize_intents(intents) - has_intent = normalized_intents.get("review_schedule", False) - has_management_intent = ( - normalized_intents.get("review_list", False) - or normalized_intents.get("review_cancel", False) - or normalized_intents.get("review_reschedule", False) - ) - - # Nao inicia slot-filling quando a intencao atual nao e de agendamento. - if has_management_intent: - # Se o usuario mudou para gerenciamento de revisao, encerra - # qualquer coleta pendente de novo agendamento. - self.PENDING_REVIEW_DRAFTS.pop(user_id, None) - self.PENDING_REVIEW_REUSE_CONFIRMATIONS.pop(user_id, None) - return None - - # Reaproveita rascunho anterior do usuario, se ainda estiver valido. - draft = self.PENDING_REVIEW_DRAFTS.get(user_id) - if draft and draft["expires_at"] < datetime.utcnow(): - self.PENDING_REVIEW_DRAFTS.pop(user_id, None) - draft = None - - extracted = self._normalize_review_fields(extracted_fields) - pending_reuse = self.PENDING_REVIEW_REUSE_CONFIRMATIONS.get(user_id) - if pending_reuse and pending_reuse["expires_at"] < datetime.utcnow(): - self.PENDING_REVIEW_REUSE_CONFIRMATIONS.pop(user_id, None) - pending_reuse = None - - if pending_reuse: - should_reuse = False - if self._is_negative_message(message): - self.PENDING_REVIEW_REUSE_CONFIRMATIONS.pop(user_id, None) - pending_reuse = None - elif self._is_affirmative_message(message) or "data_hora" in extracted: - should_reuse = True - else: - return self._render_review_reuse_question() - - if should_reuse: - seed_payload = dict(pending_reuse.get("payload") or {}) - if draft is None: - draft = { - "payload": seed_payload, - "expires_at": datetime.utcnow() + timedelta(minutes=PENDING_REVIEW_DRAFT_TTL_MINUTES), - } - else: - for key, value in seed_payload.items(): - draft["payload"].setdefault(key, value) - self.PENDING_REVIEW_REUSE_CONFIRMATIONS.pop(user_id, None) - pending_reuse = None - if "data_hora" not in extracted: - self.PENDING_REVIEW_DRAFTS[user_id] = draft - return "Perfeito. Me informe apenas a data e hora desejada para a revisao." - - if has_intent and draft is None and not extracted: - last_package = self._get_last_review_package(user_id=user_id) - if last_package: - self.PENDING_REVIEW_REUSE_CONFIRMATIONS[user_id] = { - "payload": last_package, - "expires_at": datetime.utcnow() + timedelta(minutes=PENDING_REVIEW_DRAFT_TTL_MINUTES), - } - return self._render_review_reuse_question() - - # Se houver rascunho de revisao, mas o usuario mudou para outra - # intencao operacional (ex.: compra/estoque), descarta o rascunho. - if ( - draft - and not has_intent - and ( - normalized_intents.get("order_create", False) - or normalized_intents.get("order_cancel", False) - ) - and not extracted - ): - self.PENDING_REVIEW_DRAFTS.pop(user_id, None) - return None - - # Sem intencao de revisao e sem rascunho aberto: nao interfere no fluxo normal. - if not has_intent and draft is None: - return None - - if draft is None: - draft = {"payload": {}, "expires_at": datetime.utcnow() + timedelta(minutes=PENDING_REVIEW_DRAFT_TTL_MINUTES)} - - # Merge incremental: apenas atualiza os campos detectados na mensagem atual. - draft["payload"].update(extracted) - self._try_prefill_review_fields_from_memory(user_id=user_id, payload=draft["payload"]) - # Se o usuario responder apenas "sim/nao" no follow-up, preenche o slot booleano. - if ( - "revisao_previa_concessionaria" not in draft["payload"] - and draft["payload"] - and not extracted - ): - if self._is_affirmative_message(message): - draft["payload"]["revisao_previa_concessionaria"] = True - elif self._is_negative_message(message): - draft["payload"]["revisao_previa_concessionaria"] = False - draft["expires_at"] = datetime.utcnow() + timedelta(minutes=PENDING_REVIEW_DRAFT_TTL_MINUTES) - self.PENDING_REVIEW_DRAFTS[user_id] = draft - - # Enquanto faltar campo obrigatorio, responde de forma deterministica - # (sem depender do LLM para lembrar contexto). - missing = [field for field in REVIEW_REQUIRED_FIELDS if field not in draft["payload"]] - if missing: - return self._render_missing_review_fields_prompt(missing) - - try: - # Com payload completo, executa a tool de agendamento. - tool_result = await self.registry.execute( - "agendar_revisao", - draft["payload"], - user_id=user_id, - ) - except HTTPException as exc: - # Se houver conflito com sugestao de horario, salva para confirmar com "pode/sim". - self._capture_review_confirmation_suggestion( - tool_name="agendar_revisao", - arguments=draft["payload"], - exc=exc, - user_id=user_id, - ) - return self._http_exception_detail(exc) - finally: - # Limpa o rascunho apos tentativa final para evitar estado sujo. - self.PENDING_REVIEW_DRAFTS.pop(user_id, None) - - self._store_last_review_package(user_id=user_id, payload=draft["payload"]) - return self._fallback_format_tool_result("agendar_revisao", tool_result) - - async def _try_collect_and_create_order( - self, - message: str, - user_id: int | None, - extracted_fields: dict | None = None, - intents: dict | None = None, - ) -> str | None: - if user_id is None: - return None - - normalized_intents = self._normalize_intents(intents) - draft = self.PENDING_ORDER_DRAFTS.get(user_id) - if draft and draft["expires_at"] < datetime.utcnow(): - self.PENDING_ORDER_DRAFTS.pop(user_id, None) - draft = None - - extracted = self._normalize_order_fields(extracted_fields) - has_intent = normalized_intents.get("order_create", False) - - if ( - draft - and not has_intent - and ( - normalized_intents.get("review_schedule", False) - or normalized_intents.get("review_list", False) - or normalized_intents.get("review_cancel", False) - or normalized_intents.get("review_reschedule", False) - or normalized_intents.get("order_cancel", False) - ) - and not extracted - ): - self.PENDING_ORDER_DRAFTS.pop(user_id, None) - return None - - if not has_intent and draft is None: - return None - - if draft is None: - draft = { - "payload": {}, - "expires_at": datetime.utcnow() + timedelta(minutes=PENDING_ORDER_DRAFT_TTL_MINUTES), - } - - draft["payload"].update(extracted) - self._try_prefill_order_value_from_memory(user_id=user_id, payload=draft["payload"]) - - cpf_value = draft["payload"].get("cpf") - if cpf_value and not self._is_valid_cpf(str(cpf_value)): - draft["payload"].pop("cpf", None) - self.PENDING_ORDER_DRAFTS[user_id] = draft - return "Para seguir com o pedido, preciso de um CPF valido com 11 digitos." - - valor = draft["payload"].get("valor_veiculo") - if valor is not None: - try: - parsed = float(valor) - if parsed <= 0: - draft["payload"].pop("valor_veiculo", None) - else: - draft["payload"]["valor_veiculo"] = round(parsed, 2) - except (TypeError, ValueError): - draft["payload"].pop("valor_veiculo", None) - - draft["expires_at"] = datetime.utcnow() + timedelta(minutes=PENDING_ORDER_DRAFT_TTL_MINUTES) - self.PENDING_ORDER_DRAFTS[user_id] = draft - - missing = [field for field in ORDER_REQUIRED_FIELDS if field not in draft["payload"]] - if missing: - return self._render_missing_order_fields_prompt(missing) - - try: - tool_result = await self.registry.execute( - "realizar_pedido", - draft["payload"], - user_id=user_id, - ) - except HTTPException as exc: - return self._http_exception_detail(exc) - finally: - self.PENDING_ORDER_DRAFTS.pop(user_id, None) - - return self._fallback_format_tool_result("realizar_pedido", tool_result) - - async def _try_collect_and_cancel_order( - self, - message: str, - user_id: int | None, - extracted_fields: dict | None = None, - intents: dict | None = None, - ) -> str | None: - if user_id is None: - return None - - normalized_intents = self._normalize_intents(intents) - draft = self.PENDING_CANCEL_ORDER_DRAFTS.get(user_id) - if draft and draft["expires_at"] < datetime.utcnow(): - self.PENDING_CANCEL_ORDER_DRAFTS.pop(user_id, None) - draft = None - - extracted = self._normalize_cancel_order_fields(extracted_fields) - has_intent = normalized_intents.get("order_cancel", False) - - if ( - draft - and not has_intent - and ( - normalized_intents.get("review_schedule", False) - or normalized_intents.get("review_list", False) - or normalized_intents.get("review_cancel", False) - or normalized_intents.get("review_reschedule", False) - or normalized_intents.get("order_create", False) - ) - and not extracted - ): - self.PENDING_CANCEL_ORDER_DRAFTS.pop(user_id, None) - return None - - if not has_intent and draft is None: - return None - - if draft is None: - draft = { - "payload": {}, - "expires_at": datetime.utcnow() + timedelta(minutes=PENDING_CANCEL_ORDER_DRAFT_TTL_MINUTES), - } - - if ( - "motivo" not in extracted - and draft["payload"].get("numero_pedido") - and not has_intent - ): - free_text = (message or "").strip() - if free_text and len(free_text) >= 4: - extracted["motivo"] = free_text - - draft["payload"].update(extracted) - draft["expires_at"] = datetime.utcnow() + timedelta(minutes=PENDING_CANCEL_ORDER_DRAFT_TTL_MINUTES) - self.PENDING_CANCEL_ORDER_DRAFTS[user_id] = draft - - missing = [field for field in CANCEL_ORDER_REQUIRED_FIELDS if field not in draft["payload"]] - if missing: - return self._render_missing_cancel_order_fields_prompt(missing) - - try: - tool_result = await self.registry.execute( - "cancelar_pedido", - draft["payload"], - user_id=user_id, - ) - except HTTPException as exc: - return self._http_exception_detail(exc) - finally: - self.PENDING_CANCEL_ORDER_DRAFTS.pop(user_id, None) - - return self._fallback_format_tool_result("cancelar_pedido", tool_result) - def _is_affirmative_message(self, text: str) -> bool: normalized = self._normalize_text(text).strip() normalized = re.sub(r"[.!?,;:]+$", "", normalized) @@ -1723,10 +1177,10 @@ class OrquestradorService: if not payload.get("placa"): return payload["data_hora"] = suggested_iso - self.PENDING_REVIEW_CONFIRMATIONS[user_id] = { + self.state.set_entry("pending_review_confirmations", user_id, { "payload": payload, "expires_at": datetime.utcnow() + timedelta(minutes=PENDING_REVIEW_TTL_MINUTES), - } + }) async def _try_confirm_pending_review( self, @@ -1736,7 +1190,7 @@ class OrquestradorService: ) -> str | None: if user_id is None: return None - pending = self.PENDING_REVIEW_CONFIRMATIONS.get(user_id) + pending = self.state.get_entry("pending_review_confirmations", user_id, expire=True) if not pending: return None @@ -1749,7 +1203,7 @@ class OrquestradorService: if not new_data_hora and time_only: new_data_hora = self._merge_date_with_time(pending["payload"].get("data_hora", ""), time_only) if not new_data_hora: - self.PENDING_REVIEW_CONFIRMATIONS.pop(user_id, None) + self.state.pop_entry("pending_review_confirmations", user_id) return "Sem problema. Me informe a nova data e hora desejada para a revisao." payload = dict(pending["payload"]) @@ -1761,7 +1215,7 @@ class OrquestradorService: user_id=user_id, ) except HTTPException as exc: - self.PENDING_REVIEW_CONFIRMATIONS.pop(user_id, None) + self.state.pop_entry("pending_review_confirmations", user_id) self._capture_review_confirmation_suggestion( tool_name="agendar_revisao", arguments=payload, @@ -1770,16 +1224,12 @@ class OrquestradorService: ) return self._http_exception_detail(exc) - self.PENDING_REVIEW_CONFIRMATIONS.pop(user_id, None) + self.state.pop_entry("pending_review_confirmations", user_id) self._store_last_review_package(user_id=user_id, payload=payload) return self._fallback_format_tool_result("agendar_revisao", tool_result) if not self._is_affirmative_message(message): return None - if pending["expires_at"] < datetime.utcnow(): - self.PENDING_REVIEW_CONFIRMATIONS.pop(user_id, None) - return None - try: tool_result = await self.registry.execute( "agendar_revisao", @@ -1787,35 +1237,27 @@ class OrquestradorService: user_id=user_id, ) except HTTPException as exc: - self.PENDING_REVIEW_CONFIRMATIONS.pop(user_id, None) + self.state.pop_entry("pending_review_confirmations", user_id) return self._http_exception_detail(exc) - self.PENDING_REVIEW_CONFIRMATIONS.pop(user_id, None) + self.state.pop_entry("pending_review_confirmations", user_id) self._store_last_review_package(user_id=user_id, payload=pending.get("payload")) return self._fallback_format_tool_result("agendar_revisao", tool_result) def _build_router_prompt(self, user_message: str, user_id: int | None) -> str: - user_context = f"Contexto de usuario autenticado: user_id={user_id}.\n" if user_id else "" conversation_context = self._build_context_summary(user_id=user_id) - return ( - "Voce e um assistente de concessionaria. " - "Sempre que a solicitacao depender de dados operacionais (estoque, validacao de cliente, " - "avaliacao de troca, agendamento de revisao, realizacao ou cancelamento de pedido), use a tool correta. " - "Se faltar parametro obrigatorio para a tool, responda em texto pedindo apenas o que falta.\n\n" - f"{user_context}" - f"{conversation_context}\n" - f"Mensagem do usuario: {user_message}" + return build_router_prompt( + user_message=user_message, + user_id=user_id, + conversation_context=conversation_context, ) def _build_force_tool_prompt(self, user_message: str, user_id: int | None) -> str: - user_context = f"Contexto de usuario autenticado: user_id={user_id}.\n" if user_id else "" conversation_context = self._build_context_summary(user_id=user_id) - return ( - "Reavalie a mensagem e priorize chamar tool se houver intencao operacional. " - "Use texto apenas quando faltar dado obrigatorio.\n\n" - f"{user_context}" - f"{conversation_context}\n" - f"Mensagem do usuario: {user_message}" + return build_force_tool_prompt( + user_message=user_message, + user_id=user_id, + conversation_context=conversation_context, ) def _build_result_prompt( @@ -1825,17 +1267,13 @@ class OrquestradorService: tool_name: str, tool_result, ) -> str: - user_context = f"Contexto de usuario autenticado: user_id={user_id}.\n" if user_id else "" conversation_context = self._build_context_summary(user_id=user_id) - return ( - "Responda ao usuario de forma objetiva usando o resultado da tool abaixo. " - "Nao invente dados. Se a lista vier vazia, diga explicitamente que nao encontrou resultados. " - "Retorne texto puro sem markdown, sem asteriscos, sem emojis e com linhas curtas.\n\n" - f"{user_context}" - f"{conversation_context}\n" - f"Pergunta original: {user_message}\n" - f"Tool executada: {tool_name}\n" - f"Resultado da tool: {tool_result}" + return build_result_prompt( + user_message=user_message, + user_id=user_id, + tool_name=tool_name, + tool_result=tool_result, + conversation_context=conversation_context, ) def _http_exception_detail(self, exc: HTTPException) -> str: @@ -1844,134 +1282,5 @@ class OrquestradorService: return detail return "Nao foi possivel concluir a operacao solicitada." - def _format_datetime_for_chat(self, value: str) -> str: - try: - dt = datetime.fromisoformat((value or "").replace("Z", "+00:00")) - return dt.strftime("%d/%m/%Y %H:%M") - except Exception: - return value or "N/A" - - def _format_currency_br(self, value) -> str: - try: - number = float(value) - formatted = f"{number:,.2f}".replace(",", "X").replace(".", ",").replace("X", ".") - return f"R$ {formatted}" - except Exception: - return "N/A" - def _fallback_format_tool_result(self, tool_name: str, tool_result) -> str: - if tool_name == "consultar_estoque" and isinstance(tool_result, list): - if not tool_result: - return "Nao encontrei nenhum veiculo com os criterios informados." - linhas = [f"Encontrei {len(tool_result)} veiculo(s):"] - for idx, item in enumerate(tool_result[:10], start=1): - modelo = item.get("modelo", "N/A") - categoria = item.get("categoria", "N/A") - preco = self._format_currency_br(item.get("preco")) - linhas.append(f"{idx}. {modelo} ({categoria}) - {preco}") - restantes = len(tool_result) - 10 - if restantes > 0: - linhas.append(f"... e mais {restantes} veiculo(s).") - return "\n".join(linhas) - - if tool_name == "cancelar_pedido" and isinstance(tool_result, dict): - numero = tool_result.get("numero_pedido", "N/A") - status = tool_result.get("status", "N/A") - motivo = tool_result.get("motivo") - linhas = [f"Pedido {numero} atualizado.", f"Status: {status}"] - if motivo: - linhas.append(f"Motivo: {motivo}") - return "\n".join(linhas) - - if tool_name == "realizar_pedido" and isinstance(tool_result, dict): - numero = tool_result.get("numero_pedido", "N/A") - valor = self._format_currency_br(tool_result.get("valor_veiculo")) - return f"Pedido criado com sucesso.\nNumero: {numero}\nValor: {valor}" - - if tool_name == "agendar_revisao" and isinstance(tool_result, dict): - placa = tool_result.get("placa", "N/A") - data_hora = self._format_datetime_for_chat(tool_result.get("data_hora", "N/A")) - protocolo = tool_result.get("protocolo", "N/A") - valor = tool_result.get("valor_revisao") - if isinstance(valor, (int, float)): - return ( - "Revisao agendada com sucesso.\n" - f"Protocolo: {protocolo}\n" - f"Placa: {placa}\n" - f"Data/Hora: {data_hora}\n" - f"Valor estimado: {self._format_currency_br(valor)}" - ) - return ( - "Revisao agendada com sucesso.\n" - f"Protocolo: {protocolo}\n" - f"Placa: {placa}\n" - f"Data/Hora: {data_hora}" - ) - - if tool_name == "listar_agendamentos_revisao" and isinstance(tool_result, list): - if not tool_result: - return "Nao encontrei agendamentos de revisao para sua conta." - linhas = [f"Voce tem {len(tool_result)} agendamento(s):"] - for idx, item in enumerate(tool_result[:12], start=1): - protocolo = item.get("protocolo", "N/A") - placa = item.get("placa", "N/A") - data_hora = self._format_datetime_for_chat(item.get("data_hora", "N/A")) - status = item.get("status", "N/A") - linhas.append(f"{idx}) Protocolo: {protocolo}") - linhas.append(f"Placa: {placa}") - linhas.append(f"Data/Hora: {data_hora} | Status: {status}") - if idx < min(len(tool_result), 12): - linhas.append("") - restantes = len(tool_result) - 12 - if restantes > 0: - if linhas and linhas[-1] != "": - linhas.append("") - linhas.append(f"... e mais {restantes} agendamento(s).") - return "\n".join(linhas) - - if tool_name == "cancelar_agendamento_revisao" and isinstance(tool_result, dict): - protocolo = tool_result.get("protocolo", "N/A") - status = tool_result.get("status", "N/A") - placa = tool_result.get("placa", "N/A") - data_hora = self._format_datetime_for_chat(tool_result.get("data_hora", "N/A")) - return ( - "Agendamento atualizado.\n" - f"Protocolo: {protocolo}\n" - f"Placa: {placa}\n" - f"Data/Hora: {data_hora}\n" - f"Status: {status}" - ) - - if tool_name == "editar_data_revisao" and isinstance(tool_result, dict): - protocolo = tool_result.get("protocolo", "N/A") - placa = tool_result.get("placa", "N/A") - data_hora = self._format_datetime_for_chat(tool_result.get("data_hora", "N/A")) - status = tool_result.get("status", "N/A") - return ( - "Agendamento remarcado com sucesso.\n" - f"Protocolo: {protocolo}\n" - f"Placa: {placa}\n" - f"Nova data/hora: {data_hora}\n" - f"Status: {status}" - ) - - if tool_name == "validar_cliente_venda" and isinstance(tool_result, dict): - aprovado = tool_result.get("aprovado") - limite = self._format_currency_br(tool_result.get("limite_credito")) - score = tool_result.get("score", "N/A") - cpf = tool_result.get("cpf", "N/A") - if aprovado: - return ( - "Cliente aprovado para financiamento.\n" - f"CPF: {cpf}\n" - f"Score: {score}\n" - f"Limite: {limite}" - ) - return ( - "Cliente nao aprovado para financiamento.\n" - f"CPF: {cpf}\n" - f"Score: {score}\n" - f"Limite: {limite}" - ) - - return "Operacao concluida com sucesso." + return fallback_format_tool_result(tool_name=tool_name, tool_result=tool_result) diff --git a/app/services/orchestration/prompt_builders.py b/app/services/orchestration/prompt_builders.py new file mode 100644 index 0000000..7f7d746 --- /dev/null +++ b/app/services/orchestration/prompt_builders.py @@ -0,0 +1,57 @@ +from typing import Any + + +def _build_user_context_line(user_id: int | None) -> str: + return f"Contexto de usuario autenticado: user_id={user_id}.\n" if user_id else "" + + +def build_router_prompt( + user_message: str, + user_id: int | None, + conversation_context: str, +) -> str: + user_context = _build_user_context_line(user_id) + return ( + "Voce e um assistente de concessionaria. " + "Sempre que a solicitacao depender de dados operacionais (estoque, validacao de cliente, " + "avaliacao de troca, agendamento de revisao, realizacao ou cancelamento de pedido), use a tool correta. " + "Se faltar parametro obrigatorio para a tool, responda em texto pedindo apenas o que falta.\n\n" + f"{user_context}" + f"{conversation_context}\n" + f"Mensagem do usuario: {user_message}" + ) + + +def build_force_tool_prompt( + user_message: str, + user_id: int | None, + conversation_context: str, +) -> str: + user_context = _build_user_context_line(user_id) + return ( + "Reavalie a mensagem e priorize chamar tool se houver intencao operacional. " + "Use texto apenas quando faltar dado obrigatorio.\n\n" + f"{user_context}" + f"{conversation_context}\n" + f"Mensagem do usuario: {user_message}" + ) + + +def build_result_prompt( + user_message: str, + user_id: int | None, + tool_name: str, + tool_result: Any, + conversation_context: str, +) -> str: + user_context = _build_user_context_line(user_id) + return ( + "Responda ao usuario de forma objetiva usando o resultado da tool abaixo. " + "Nao invente dados. Se a lista vier vazia, diga explicitamente que nao encontrou resultados. " + "Retorne texto puro sem markdown, sem asteriscos, sem emojis e com linhas curtas.\n\n" + f"{user_context}" + f"{conversation_context}\n" + f"Pergunta original: {user_message}\n" + f"Tool executada: {tool_name}\n" + f"Resultado da tool: {tool_result}" + ) diff --git a/app/services/orchestration/response_formatter.py b/app/services/orchestration/response_formatter.py new file mode 100644 index 0000000..7bffafb --- /dev/null +++ b/app/services/orchestration/response_formatter.py @@ -0,0 +1,137 @@ +from datetime import datetime +from typing import Any + + +def format_datetime_for_chat(value: str) -> str: + try: + dt = datetime.fromisoformat((value or "").replace("Z", "+00:00")) + return dt.strftime("%d/%m/%Y %H:%M") + except Exception: + return value or "N/A" + + +def format_currency_br(value: Any) -> str: + try: + number = float(value) + formatted = f"{number:,.2f}".replace(",", "X").replace(".", ",").replace("X", ".") + return f"R$ {formatted}" + except Exception: + return "N/A" + + +def fallback_format_tool_result(tool_name: str, tool_result: Any) -> str: + if tool_name == "consultar_estoque" and isinstance(tool_result, list): + if not tool_result: + return "Nao encontrei nenhum veiculo com os criterios informados." + linhas = [f"Encontrei {len(tool_result)} veiculo(s):"] + for idx, item in enumerate(tool_result[:10], start=1): + modelo = item.get("modelo", "N/A") + categoria = item.get("categoria", "N/A") + preco = format_currency_br(item.get("preco")) + linhas.append(f"{idx}. {modelo} ({categoria}) - {preco}") + restantes = len(tool_result) - 10 + if restantes > 0: + linhas.append(f"... e mais {restantes} veiculo(s).") + return "\n".join(linhas) + + if tool_name == "cancelar_pedido" and isinstance(tool_result, dict): + numero = tool_result.get("numero_pedido", "N/A") + status = tool_result.get("status", "N/A") + motivo = tool_result.get("motivo") + linhas = [f"Pedido {numero} atualizado.", f"Status: {status}"] + if motivo: + linhas.append(f"Motivo: {motivo}") + return "\n".join(linhas) + + if tool_name == "realizar_pedido" and isinstance(tool_result, dict): + numero = tool_result.get("numero_pedido", "N/A") + valor = format_currency_br(tool_result.get("valor_veiculo")) + return f"Pedido criado com sucesso.\nNumero: {numero}\nValor: {valor}" + + if tool_name == "agendar_revisao" and isinstance(tool_result, dict): + placa = tool_result.get("placa", "N/A") + data_hora = format_datetime_for_chat(tool_result.get("data_hora", "N/A")) + protocolo = tool_result.get("protocolo", "N/A") + valor = tool_result.get("valor_revisao") + if isinstance(valor, (int, float)): + return ( + "Revisao agendada com sucesso.\n" + f"Protocolo: {protocolo}\n" + f"Placa: {placa}\n" + f"Data/Hora: {data_hora}\n" + f"Valor estimado: {format_currency_br(valor)}" + ) + return ( + "Revisao agendada com sucesso.\n" + f"Protocolo: {protocolo}\n" + f"Placa: {placa}\n" + f"Data/Hora: {data_hora}" + ) + + if tool_name == "listar_agendamentos_revisao" and isinstance(tool_result, list): + if not tool_result: + return "Nao encontrei agendamentos de revisao para sua conta." + linhas = [f"Voce tem {len(tool_result)} agendamento(s):"] + for idx, item in enumerate(tool_result[:12], start=1): + protocolo = item.get("protocolo", "N/A") + placa = item.get("placa", "N/A") + data_hora = format_datetime_for_chat(item.get("data_hora", "N/A")) + status = item.get("status", "N/A") + linhas.append(f"{idx}) Protocolo: {protocolo}") + linhas.append(f"Placa: {placa}") + linhas.append(f"Data/Hora: {data_hora} | Status: {status}") + if idx < min(len(tool_result), 12): + linhas.append("") + restantes = len(tool_result) - 12 + if restantes > 0: + if linhas and linhas[-1] != "": + linhas.append("") + linhas.append(f"... e mais {restantes} agendamento(s).") + return "\n".join(linhas) + + if tool_name == "cancelar_agendamento_revisao" and isinstance(tool_result, dict): + protocolo = tool_result.get("protocolo", "N/A") + status = tool_result.get("status", "N/A") + placa = tool_result.get("placa", "N/A") + data_hora = format_datetime_for_chat(tool_result.get("data_hora", "N/A")) + return ( + "Agendamento atualizado.\n" + f"Protocolo: {protocolo}\n" + f"Placa: {placa}\n" + f"Data/Hora: {data_hora}\n" + f"Status: {status}" + ) + + if tool_name == "editar_data_revisao" and isinstance(tool_result, dict): + protocolo = tool_result.get("protocolo", "N/A") + placa = tool_result.get("placa", "N/A") + data_hora = format_datetime_for_chat(tool_result.get("data_hora", "N/A")) + status = tool_result.get("status", "N/A") + return ( + "Agendamento remarcado com sucesso.\n" + f"Protocolo: {protocolo}\n" + f"Placa: {placa}\n" + f"Nova data/hora: {data_hora}\n" + f"Status: {status}" + ) + + if tool_name == "validar_cliente_venda" and isinstance(tool_result, dict): + aprovado = tool_result.get("aprovado") + limite = format_currency_br(tool_result.get("limite_credito")) + score = tool_result.get("score", "N/A") + cpf = tool_result.get("cpf", "N/A") + if aprovado: + return ( + "Cliente aprovado para financiamento.\n" + f"CPF: {cpf}\n" + f"Score: {score}\n" + f"Limite: {limite}" + ) + return ( + "Cliente nao aprovado para financiamento.\n" + f"CPF: {cpf}\n" + f"Score: {score}\n" + f"Limite: {limite}" + ) + + return "Operacao concluida com sucesso." diff --git a/app/services/handlers.py b/app/services/tools/handlers.py similarity index 100% rename from app/services/handlers.py rename to app/services/tools/handlers.py diff --git a/app/services/tool_registry.py b/app/services/tools/tool_registry.py similarity index 98% rename from app/services/tool_registry.py rename to app/services/tools/tool_registry.py index bdae591..bd87544 100644 --- a/app/services/tool_registry.py +++ b/app/services/tools/tool_registry.py @@ -5,7 +5,7 @@ from sqlalchemy.orm import Session from app.models.tool_model import ToolDefinition from app.repositories.tool_repository import ToolRepository -from app.services.handlers import ( +from app.services.tools.handlers import ( agendar_revisao, avaliar_veiculo_troca, cancelar_agendamento_revisao, diff --git a/app/services/user_service.py b/app/services/user/user_service.py similarity index 100% rename from app/services/user_service.py rename to app/services/user/user_service.py