import re from datetime import datetime, timedelta from fastapi import HTTPException from app.db.mock_database import SessionMockLocal from app.db.mock_models import User, Vehicle from app.services.orchestration.technical_normalizer import extract_budget_from_text, extract_cpf_from_text, is_valid_cpf from app.services.orchestration.orchestrator_config import ( CANCEL_ORDER_REQUIRED_FIELDS, ORDER_REQUIRED_FIELDS, PENDING_CANCEL_ORDER_DRAFT_TTL_MINUTES, PENDING_ORDER_DRAFT_TTL_MINUTES, ) from app.services.user.mock_customer_service import hydrate_mock_customer_from_cpf # Esse mixin cuida dos fluxos de venda: # criacao de pedido, selecao de veiculo e cancelamento. class OrderFlowMixin: def _decision_intent(self, turn_decision: dict | None) -> str: return str((turn_decision or {}).get("intent") or "").strip().lower() def _has_order_listing_request(self, message: str, turn_decision: dict | None = None) -> bool: normalized = self._normalize_text(message).strip() review_listing_terms = { "agendamento", "agendamentos", "revisao", "revisoes", } if any(term in normalized for term in review_listing_terms): return False if self._decision_intent(turn_decision) == "order_list": return True listing_terms = { "meus pedidos", "meu pedido", "listar pedidos", "liste meus pedidos", "lista de pedidos", "quais sao meus pedidos", "quais sao os meus pedidos", "mostrar pedidos", "mostre meus pedidos", "consultar pedidos", "ver meus pedidos", "acompanhar pedido", "acompanhar pedidos", "status do pedido", "status dos pedidos", } return any(term in normalized for term in listing_terms) def _has_explicit_order_request(self, message: str) -> bool: if self._has_order_listing_request(message): return False normalized = self._normalize_text(message).strip() order_terms = { "comprar", "compra", "pedido", "pedir", "financiar", "financiamento", "simular compra", "realizar pedido", "fazer um pedido", } return any(term in normalized for term in order_terms) def _has_stock_listing_request(self, message: str, turn_decision: dict | None = None) -> bool: if self._has_order_listing_request(message=message, turn_decision=turn_decision): return False if self._decision_intent(turn_decision) == "inventory_search": return True normalized = self._normalize_text(message).strip() stock_terms = { "estoque", "listar", "liste", "mostrar", "mostre", "ver carros", "ver veiculos", "opcoes", "modelos", } return any(term in normalized for term in stock_terms) def _is_valid_cpf(self, cpf: str) -> bool: return is_valid_cpf(cpf) def _try_capture_order_cpf_from_message(self, message: str, payload: dict) -> None: if payload.get("cpf"): return cpf = extract_cpf_from_text(message) if cpf and self._is_valid_cpf(cpf): payload["cpf"] = cpf def _try_capture_order_budget_from_message(self, user_id: int | None, message: str, payload: dict) -> None: if not self._has_explicit_order_request(message) and self.state.get_entry("pending_order_drafts", user_id, expire=True) is None: return context = self._get_user_context(user_id) if not isinstance(context, dict): return generic_memory = context.get("generic_memory") if not isinstance(generic_memory, dict): generic_memory = {} context["generic_memory"] = generic_memory if generic_memory.get("orcamento_max"): return budget = extract_budget_from_text(message) if budget: generic_memory["orcamento_max"] = int(round(budget)) context.setdefault("shared_memory", {})["orcamento_max"] = int(round(budget)) self._save_user_context(user_id=user_id, context=context) def _try_prefill_order_cpf_from_memory(self, user_id: int | None, payload: dict) -> None: if user_id is None or payload.get("cpf"): return context = self._get_user_context(user_id) if not context: return memory = context.get("generic_memory", {}) cpf = memory.get("cpf") if isinstance(cpf, str) and self._is_valid_cpf(cpf): payload["cpf"] = cpf def _try_prefill_order_cpf_from_user_profile(self, user_id: int | None, payload: dict) -> None: if user_id is None or payload.get("cpf"): return db = SessionMockLocal() try: user = db.query(User).filter(User.id == user_id).first() if user and isinstance(user.cpf, str) and self._is_valid_cpf(user.cpf): payload["cpf"] = user.cpf finally: db.close() def _get_last_stock_results(self, user_id: int | None) -> list[dict]: context = self._get_user_context(user_id) if not context: return [] stock_results = context.get("last_stock_results") or [] return stock_results if isinstance(stock_results, list) else [] def _get_selected_vehicle(self, user_id: int | None) -> dict | None: context = self._get_user_context(user_id) if not context: return None selected_vehicle = context.get("selected_vehicle") return dict(selected_vehicle) if isinstance(selected_vehicle, dict) else None def _get_pending_single_vehicle_confirmation(self, user_id: int | None) -> dict | None: context = self._get_user_context(user_id) if not context: return None pending_vehicle = context.get("pending_single_vehicle_confirmation") return dict(pending_vehicle) if isinstance(pending_vehicle, dict) else None def _remember_stock_results(self, user_id: int | None, stock_results: list[dict] | None) -> None: context = self._get_user_context(user_id) if not context: return sanitized: list[dict] = [] for item in stock_results or []: if not isinstance(item, dict): continue try: vehicle_id = int(item.get("id")) preco = float(item.get("preco") or 0) except (TypeError, ValueError): continue sanitized.append( { "id": vehicle_id, "modelo": str(item.get("modelo") or "").strip(), "categoria": str(item.get("categoria") or "").strip(), "preco": preco, } ) context["last_stock_results"] = sanitized if sanitized: context["selected_vehicle"] = None context["pending_single_vehicle_confirmation"] = None self._save_user_context(user_id=user_id, context=context) def _store_selected_vehicle(self, user_id: int | None, vehicle: dict | None) -> None: if user_id is None: return context = self._get_user_context(user_id) if not context: return context["selected_vehicle"] = dict(vehicle) if isinstance(vehicle, dict) else None context["pending_single_vehicle_confirmation"] = None self._save_user_context(user_id=user_id, context=context) def _store_pending_single_vehicle_confirmation(self, user_id: int | None, vehicle: dict | None) -> None: if user_id is None: return context = self._get_user_context(user_id) if not context: return context["pending_single_vehicle_confirmation"] = dict(vehicle) if isinstance(vehicle, dict) else None self._save_user_context(user_id=user_id, context=context) def _clear_pending_single_vehicle_confirmation(self, user_id: int | None) -> None: if user_id is None: return context = self._get_user_context(user_id) if not isinstance(context, dict): return context["pending_single_vehicle_confirmation"] = None self._save_user_context(user_id=user_id, context=context) def _vehicle_to_payload(self, vehicle: dict) -> dict: return { "vehicle_id": int(vehicle["id"]), "modelo_veiculo": str(vehicle["modelo"]), "valor_veiculo": round(float(vehicle["preco"]), 2), } def _try_prefill_order_vehicle_from_context(self, user_id: int | None, payload: dict) -> None: if user_id is None or payload.get("vehicle_id"): return if self._get_pending_single_vehicle_confirmation(user_id=user_id): return selected_vehicle = self._get_selected_vehicle(user_id=user_id) if selected_vehicle: payload.update(self._vehicle_to_payload(selected_vehicle)) def _build_stock_lookup_arguments(self, user_id: int | None, payload: dict | None = None) -> dict: context = self._get_user_context(user_id) generic_memory = context.get("generic_memory", {}) if isinstance(context, dict) else {} source = payload if isinstance(payload, dict) else {} budget = generic_memory.get("orcamento_max") if budget is None: budget = source.get("valor_veiculo") arguments: dict = {} if isinstance(budget, (int, float)) and float(budget) > 0: arguments["preco_max"] = float(budget) perfil = generic_memory.get("perfil_veiculo") if isinstance(perfil, list) and perfil: arguments["categoria"] = str(perfil[0]).strip().lower() arguments["limite"] = 5 arguments["ordenar_preco"] = "asc" return arguments def _should_refresh_stock_context(self, user_id: int | None, payload: dict | None = None) -> bool: if user_id is None: return False context = self._get_user_context(user_id) if not isinstance(context, dict): return False generic_memory = context.get("generic_memory", {}) if isinstance(context.get("generic_memory"), dict) else {} selected_vehicle = context.get("selected_vehicle") last_stock_results = context.get("last_stock_results") or [] source = payload if isinstance(payload, dict) else {} budget = generic_memory.get("orcamento_max") if isinstance(budget, (int, float)) and float(budget) > 0: if isinstance(selected_vehicle, dict): try: if float(selected_vehicle.get("preco") or 0) > float(budget): return True except (TypeError, ValueError): return True for item in last_stock_results: if not isinstance(item, dict): continue try: if float(item.get("preco") or 0) > float(budget): return True except (TypeError, ValueError): return True perfil = generic_memory.get("perfil_veiculo") expected_category = str(perfil[0]).strip().lower() if isinstance(perfil, list) and perfil else None if expected_category: if isinstance(selected_vehicle, dict): if str(selected_vehicle.get("categoria") or "").strip().lower() != expected_category: return True for item in last_stock_results: if not isinstance(item, dict): continue if str(item.get("categoria") or "").strip().lower() != expected_category: return True vehicle_budget = source.get("valor_veiculo") if isinstance(vehicle_budget, (int, float)) and isinstance(selected_vehicle, dict): try: if float(selected_vehicle.get("preco") or 0) > float(vehicle_budget): return True except (TypeError, ValueError): return True return False def _reset_order_stock_context(self, user_id: int | None) -> None: if user_id is None: return context = self._get_user_context(user_id) if not isinstance(context, dict): return context["last_stock_results"] = [] context["selected_vehicle"] = None context["pending_single_vehicle_confirmation"] = None self._save_user_context(user_id=user_id, context=context) def _match_vehicle_from_message_index(self, message: str, stock_results: list[dict]) -> dict | None: tokens = [token for token in re.findall(r"\d+", str(message or "")) if token.isdigit()] if not tokens: return None choice = int(tokens[0]) if 1 <= choice <= len(stock_results): return stock_results[choice - 1] return None def _match_vehicle_from_message_model(self, message: str, stock_results: list[dict]) -> dict | None: normalized_message = self._normalize_text(message) matches = [] for item in stock_results: normalized_model = self._normalize_text(str(item.get("modelo") or "")) if normalized_model and normalized_model in normalized_message: matches.append(item) if len(matches) == 1: return matches[0] return None def _load_vehicle_by_id(self, vehicle_id: int) -> dict | None: db = SessionMockLocal() try: vehicle = db.query(Vehicle).filter(Vehicle.id == vehicle_id).first() if not vehicle: return None return { "id": int(vehicle.id), "modelo": str(vehicle.modelo), "categoria": str(vehicle.categoria), "preco": float(vehicle.preco), } finally: db.close() def _try_resolve_order_vehicle(self, message: str, user_id: int | None, payload: dict) -> dict | None: # Primeiro tenta um vehicle_id explicito; depois tenta casar # a resposta do usuario com a ultima lista de estoque mostrada. vehicle_id = payload.get("vehicle_id") if isinstance(vehicle_id, int) and vehicle_id > 0: return self._load_vehicle_by_id(vehicle_id) stock_results = self._get_last_stock_results(user_id=user_id) selected_from_model = self._match_vehicle_from_message_model(message=message, stock_results=stock_results) if selected_from_model: return selected_from_model selected_from_index = self._match_vehicle_from_message_index(message=message, stock_results=stock_results) if selected_from_index: return selected_from_index normalized_model = self._normalize_text(str(payload.get("modelo_veiculo") or "")) if normalized_model: matches = [ item for item in stock_results if self._normalize_text(str(item.get("modelo") or "")) == normalized_model ] if len(matches) == 1: return matches[0] return None def _render_missing_order_fields_prompt(self, missing_fields: list[str]) -> str: labels = { "cpf": "o CPF do cliente", "vehicle_id": "qual veiculo do estoque voce quer comprar", } itens = [f"- {labels[field]}" for field in missing_fields] return "Para realizar o pedido, preciso dos dados abaixo:\n" + "\n".join(itens) def _render_vehicle_selection_from_stock_prompt(self, stock_results: list[dict]) -> str: lines = ["Para realizar o pedido, escolha primeiro qual veiculo voce quer comprar:"] for idx, item in enumerate(stock_results[:5], start=1): lines.append( f"- {idx}. {item.get('modelo', 'N/A')} " f"({item.get('categoria', 'N/A')}) - R$ {float(item.get('preco', 0)):.2f}" ) lines.append("Pode responder com o numero da lista ou com o modelo do veiculo.") return "\n".join(lines) def _render_single_vehicle_confirmation_prompt(self, vehicle: dict) -> str: return ( "Encontrei 1 opcao para o seu pedido:\n" f"- 1. {vehicle.get('modelo', 'N/A')} ({vehicle.get('categoria', 'N/A')}) - " f"R$ {float(vehicle.get('preco', 0)):.2f}\n" "Posso seguir com essa opcao? Responda com 1, sim ou com o modelo do veiculo." ) def _message_confirms_single_vehicle(self, message: str, vehicle: dict) -> bool: normalized_message = self._normalize_text(message).strip() if self._is_affirmative_message(message): return True if normalized_message == "1": return True normalized_model = self._normalize_text(str(vehicle.get("modelo") or "")).strip() if normalized_model and normalized_model in normalized_message: return True return False async def _try_list_stock_for_order_selection( self, message: str, user_id: int | None, payload: dict, turn_decision: dict | None = None, force: bool = False, ) -> str | None: if user_id is None: return None if not force and not self._has_stock_listing_request(message, turn_decision=turn_decision): return None arguments = self._build_stock_lookup_arguments(user_id=user_id, payload=payload) if "preco_max" not in arguments and "categoria" not in arguments: return None try: tool_result = await self.tool_executor.execute( "consultar_estoque", arguments, user_id=user_id, ) except HTTPException as exc: return self._http_exception_detail(exc) stock_results = tool_result if isinstance(tool_result, list) else [] self._remember_stock_results(user_id=user_id, stock_results=stock_results) if len(stock_results) == 1: self._store_pending_single_vehicle_confirmation(user_id=user_id, vehicle=stock_results[0]) return self._render_single_vehicle_confirmation_prompt(stock_results[0]) return self._fallback_format_tool_result("consultar_estoque", tool_result) def _render_missing_cancel_order_fields_prompt(self, missing_fields: list[str]) -> str: if missing_fields == ["motivo"]: return "Encontrei o pedido informado. Qual o motivo do cancelamento?" labels = { "numero_pedido": "o numero do pedido (ex.: PED-20260305123456-ABC123)", "motivo": "o motivo do cancelamento", } itens = [f"- {labels[field]}" for field in missing_fields] return "Para cancelar o pedido, preciso dos dados abaixo:\n" + "\n".join(itens) async def _try_handle_order_listing( self, message: str, user_id: int | None, intents: dict | None = None, turn_decision: dict | None = None, ) -> str | None: if user_id is None: return None normalized_intents = self._normalize_intents(intents) if any(term in self._normalize_text(message).strip() for term in {"agendamento", "agendamentos", "revisao", "revisoes"}): return None has_intent = ( self._decision_intent(turn_decision) == "order_list" or normalized_intents.get("order_list", False) or self._has_order_listing_request(message=message, turn_decision=turn_decision) ) if not has_intent: return None try: tool_result = await self.tool_executor.execute( "listar_pedidos", {"limite": 10}, user_id=user_id, ) except HTTPException as exc: return self._http_exception_detail(exc) return self._fallback_format_tool_result("listar_pedidos", tool_result) async def _try_collect_and_create_order( self, message: str, user_id: int | None, extracted_fields: dict | None = None, intents: dict | None = None, turn_decision: dict | None = None, ) -> str | None: if user_id is None: return None normalized_intents = self._normalize_intents(intents) draft = self.state.get_entry("pending_order_drafts", user_id, expire=True) extracted = self._normalize_order_fields(extracted_fields) decision_intent = self._decision_intent(turn_decision) has_intent = decision_intent == "order_create" or normalized_intents.get("order_create", False) explicit_order_request = self._has_explicit_order_request(message) if ( draft and not has_intent and ( decision_intent in { "review_schedule", "review_list", "review_cancel", "review_reschedule", "order_cancel", } or normalized_intents.get("review_schedule", False) or normalized_intents.get("review_list", False) or normalized_intents.get("review_cancel", False) or normalized_intents.get("review_reschedule", False) or normalized_intents.get("order_cancel", False) ) and not extracted ): self.state.pop_entry("pending_order_drafts", user_id) return None if draft is None and not has_intent and not explicit_order_request: return None if draft is None: draft = { "payload": {}, "expires_at": datetime.utcnow() + timedelta(minutes=PENDING_ORDER_DRAFT_TTL_MINUTES), } draft["payload"].update(extracted) self._try_capture_order_cpf_from_message(message=message, payload=draft["payload"]) self._try_capture_order_budget_from_message(user_id=user_id, message=message, payload=draft["payload"]) self._try_prefill_order_cpf_from_memory(user_id=user_id, payload=draft["payload"]) self._try_prefill_order_vehicle_from_context(user_id=user_id, payload=draft["payload"]) if self._should_refresh_stock_context(user_id=user_id, payload=draft["payload"]): self._reset_order_stock_context(user_id=user_id) draft["payload"].pop("vehicle_id", None) draft["payload"].pop("modelo_veiculo", None) draft["payload"].pop("valor_veiculo", None) pending_single_vehicle = self._get_pending_single_vehicle_confirmation(user_id=user_id) if pending_single_vehicle and not draft["payload"].get("vehicle_id"): if self._message_confirms_single_vehicle(message=message, vehicle=pending_single_vehicle): self._store_selected_vehicle(user_id=user_id, vehicle=pending_single_vehicle) draft["payload"].update(self._vehicle_to_payload(pending_single_vehicle)) pending_single_vehicle = None elif self._is_negative_message(message): self._clear_pending_single_vehicle_confirmation(user_id=user_id) draft["expires_at"] = datetime.utcnow() + timedelta(minutes=PENDING_ORDER_DRAFT_TTL_MINUTES) self.state.set_entry("pending_order_drafts", user_id, draft) return "Sem problema. Me diga outro modelo ou ajuste o valor para eu buscar novas opcoes." elif not self._has_explicit_order_request(message): draft["expires_at"] = datetime.utcnow() + timedelta(minutes=PENDING_ORDER_DRAFT_TTL_MINUTES) self.state.set_entry("pending_order_drafts", user_id, draft) return self._render_single_vehicle_confirmation_prompt(pending_single_vehicle) resolved_vehicle = self._try_resolve_order_vehicle( message=message, user_id=user_id, payload=draft["payload"], ) if resolved_vehicle: # Mantem a selecao no estado para que o usuario informe # o CPF depois sem perder o veiculo escolhido. self._store_selected_vehicle(user_id=user_id, vehicle=resolved_vehicle) draft["payload"].update(self._vehicle_to_payload(resolved_vehicle)) cpf_value = draft["payload"].get("cpf") if cpf_value and not self._is_valid_cpf(str(cpf_value)): draft["payload"].pop("cpf", None) self.state.set_entry("pending_order_drafts", user_id, draft) return "Para seguir com o pedido, preciso de um CPF valido. Pode me informar novamente?" if cpf_value: try: await hydrate_mock_customer_from_cpf( cpf=str(cpf_value), user_id=user_id, ) except ValueError: draft["payload"].pop("cpf", None) self.state.set_entry("pending_order_drafts", user_id, draft) return "Para seguir com o pedido, preciso de um CPF valido. Pode me informar novamente?" draft["expires_at"] = datetime.utcnow() + timedelta(minutes=PENDING_ORDER_DRAFT_TTL_MINUTES) self.state.set_entry("pending_order_drafts", user_id, draft) missing = [field for field in ORDER_REQUIRED_FIELDS if field not in draft["payload"]] if missing: if "vehicle_id" in missing: stock_response = await self._try_list_stock_for_order_selection( message=message, user_id=user_id, payload=draft["payload"], turn_decision=turn_decision, force=bool(draft) or has_intent or explicit_order_request, ) if stock_response: return stock_response stock_results = self._get_last_stock_results(user_id=user_id) if stock_results: return self._render_vehicle_selection_from_stock_prompt(stock_results) return self._render_missing_order_fields_prompt(missing) try: tool_result = await self.tool_executor.execute( "realizar_pedido", { "cpf": draft["payload"]["cpf"], "vehicle_id": draft["payload"]["vehicle_id"], }, user_id=user_id, ) except HTTPException as exc: error = self.tool_executor.coerce_http_error(exc) if error.get("retryable") and error.get("field"): draft["payload"].pop(str(error["field"]), None) draft["expires_at"] = datetime.utcnow() + timedelta(minutes=PENDING_ORDER_DRAFT_TTL_MINUTES) self.state.set_entry("pending_order_drafts", user_id, draft) return self._http_exception_detail(exc) self.state.pop_entry("pending_order_drafts", user_id) return self._fallback_format_tool_result("realizar_pedido", tool_result) async def _try_collect_and_cancel_order( self, message: str, user_id: int | None, extracted_fields: dict | None = None, intents: dict | None = None, turn_decision: dict | None = None, ) -> str | None: if user_id is None: return None normalized_intents = self._normalize_intents(intents) draft = self.state.get_entry("pending_cancel_order_drafts", user_id, expire=True) active_order_draft = self.state.get_entry("pending_order_drafts", user_id, expire=True) extracted = self._normalize_cancel_order_fields(extracted_fields) decision_intent = self._decision_intent(turn_decision) has_intent = decision_intent == "order_cancel" or normalized_intents.get("order_cancel", False) if ( draft is None and active_order_draft is not None and ( not has_intent or ("numero_pedido" not in extracted and "motivo" not in extracted) ) ): return None if ( draft and not has_intent and ( decision_intent in { "review_schedule", "review_list", "review_cancel", "review_reschedule", "order_list", "order_create", } or normalized_intents.get("review_schedule", False) or normalized_intents.get("review_list", False) or normalized_intents.get("review_cancel", False) or normalized_intents.get("review_reschedule", False) or normalized_intents.get("order_list", False) or normalized_intents.get("order_create", False) ) and not extracted ): self.state.pop_entry("pending_cancel_order_drafts", user_id) return None if not has_intent and draft is None: return None if draft is None: draft = { "payload": {}, "expires_at": datetime.utcnow() + timedelta(minutes=PENDING_CANCEL_ORDER_DRAFT_TTL_MINUTES), } if ( "motivo" not in extracted and draft["payload"].get("numero_pedido") and "numero_pedido" not in extracted ): # Quando o pedido ja foi identificado, um texto livre curto # e tratado como motivo do cancelamento. free_text = (message or "").strip() if free_text and len(free_text) >= 4: extracted["motivo"] = free_text draft["payload"].update(extracted) draft["expires_at"] = datetime.utcnow() + timedelta(minutes=PENDING_CANCEL_ORDER_DRAFT_TTL_MINUTES) self.state.set_entry("pending_cancel_order_drafts", user_id, draft) missing = [field for field in CANCEL_ORDER_REQUIRED_FIELDS if field not in draft["payload"]] if missing: return self._render_missing_cancel_order_fields_prompt(missing) try: tool_result = await self.tool_executor.execute( "cancelar_pedido", draft["payload"], user_id=user_id, ) except HTTPException as exc: error = self.tool_executor.coerce_http_error(exc) if error.get("retryable") and error.get("field"): draft["payload"].pop(str(error["field"]), None) draft["expires_at"] = datetime.utcnow() + timedelta(minutes=PENDING_CANCEL_ORDER_DRAFT_TTL_MINUTES) self.state.set_entry("pending_cancel_order_drafts", user_id, draft) return self._http_exception_detail(exc) self.state.pop_entry("pending_cancel_order_drafts", user_id) return self._fallback_format_tool_result("cancelar_pedido", tool_result)