import re import json import logging import unicodedata from datetime import datetime, timedelta from fastapi import HTTPException from sqlalchemy.orm import Session from app.services.orchestrator_config import ( CANCEL_ORDER_REQUIRED_FIELDS, DETERMINISTIC_RESPONSE_TOOLS, LOW_VALUE_RESPONSES, LAST_REVIEW_PACKAGE_TTL_MINUTES, ORDER_REQUIRED_FIELDS, PENDING_CANCEL_ORDER_DRAFT_TTL_MINUTES, PENDING_ORDER_DRAFT_TTL_MINUTES, PENDING_REVIEW_DRAFT_TTL_MINUTES, PENDING_REVIEW_TTL_MINUTES, REVIEW_REQUIRED_FIELDS, USER_CONTEXT_TTL_MINUTES, ) from app.services.llm_service import LLMService from app.services.tool_registry import ToolRegistry logger = logging.getLogger(__name__) class OrquestradorService: USER_CONTEXTS: dict[int, dict] = {} # Memoria temporaria de confirmacao quando a API sugere novo horario (conflito 409). PENDING_REVIEW_CONFIRMATIONS: dict[int, dict] = {} # Rascunho por usuario para juntar dados de revisao enviados em mensagens separadas. PENDING_REVIEW_DRAFTS: dict[int, dict] = {} PENDING_REVIEW_MANAGEMENT_DRAFTS: dict[int, dict] = {} LAST_REVIEW_PACKAGES: dict[int, dict] = {} PENDING_REVIEW_REUSE_CONFIRMATIONS: dict[int, dict] = {} PENDING_ORDER_DRAFTS: dict[int, dict] = {} PENDING_CANCEL_ORDER_DRAFTS: dict[int, dict] = {} def __init__(self, db: Session): """Inicializa servicos de LLM e registro de tools para a sessao atual.""" self.llm = LLMService() self.registry = ToolRegistry(db) async def handle_message(self, message: str, user_id: int | None = None) -> str: """Processa mensagem, executa tool quando necessario e retorna resposta final.""" async def finish(response: str, queue_notice: str | None = None) -> str: composed = self._compose_order_aware_response( response=response, user_id=user_id, queue_notice=queue_notice, ) return await self._maybe_auto_advance_next_order( base_response=composed, user_id=user_id, ) self._upsert_user_context(user_id=user_id) queued_followup = await self._try_continue_queued_order(message=message, user_id=user_id) if queued_followup: return queued_followup message_plan = await self._extract_message_plan_with_llm( message=message, user_id=user_id, ) routing_plan = { "orders": [ { "domain": item.get("domain", "general"), "message": item.get("message", ""), } for item in message_plan.get("orders", []) ] } ( routing_message, queue_notice, queue_early_response, ) = self._prepare_message_for_single_order( message=message, user_id=user_id, routing_plan=routing_plan, ) if queue_early_response: return await finish(queue_early_response, queue_notice=queue_notice) extracted_entities = self._resolve_entities_for_message_plan( message_plan=message_plan, routed_message=routing_message, ) if not self._has_useful_extraction(extracted_entities): extracted_entities = await self._extract_entities_with_llm( message=routing_message, user_id=user_id, ) self._capture_generic_memory( user_id=user_id, llm_generic_fields=extracted_entities.get("generic_memory", {}), ) domain_hint = self._domain_from_intents(extracted_entities.get("intents", {})) context_switch_response = self._handle_context_switch( message=routing_message, user_id=user_id, target_domain_hint=domain_hint, ) if context_switch_response: return await finish(context_switch_response, queue_notice=queue_notice) self._update_active_domain(user_id=user_id, domain_hint=domain_hint) review_management_response = await self._try_handle_review_management( message=routing_message, user_id=user_id, extracted_fields=extracted_entities.get("review_management_fields", {}), intents=extracted_entities.get("intents", {}), ) if review_management_response: return await finish(review_management_response, queue_notice=queue_notice) # 1) Se houver sugestao pendente de horario e o usuario confirmou ("pode/sim"), # agenda direto no horario sugerido. confirmation_response = await self._try_confirm_pending_review( message=routing_message, user_id=user_id, extracted_review_fields=extracted_entities.get("review_fields", {}), ) if confirmation_response: return await finish(confirmation_response, queue_notice=queue_notice) # 2) Fluxo de coleta incremental de dados da revisao (slot filling). # Evita pedir tudo de novo quando o usuario responde em partes. review_response = await self._try_collect_and_schedule_review( message=routing_message, user_id=user_id, extracted_fields=extracted_entities.get("review_fields", {}), intents=extracted_entities.get("intents", {}), ) if review_response: return await finish(review_response, queue_notice=queue_notice) # 3) Fluxo de coleta incremental para cancelamento de pedido. cancel_order_response = await self._try_collect_and_cancel_order( message=routing_message, user_id=user_id, extracted_fields=extracted_entities.get("cancel_order_fields", {}), intents=extracted_entities.get("intents", {}), ) if cancel_order_response: return await finish(cancel_order_response, queue_notice=queue_notice) # 4) Fluxo de coleta incremental para realizacao de pedido. order_response = await self._try_collect_and_create_order( message=routing_message, user_id=user_id, extracted_fields=extracted_entities.get("order_fields", {}), intents=extracted_entities.get("intents", {}), ) if order_response: return await finish(order_response, queue_notice=queue_notice) tools = self.registry.get_tools() llm_result = await self.llm.generate_response( message=self._build_router_prompt(user_message=routing_message, user_id=user_id), tools=tools, ) first_pass_text = (llm_result.get("response") or "").strip() should_force_tool = ( not llm_result["tool_call"] and self._has_operational_intent(extracted_entities) and self._is_low_value_response(first_pass_text) ) if should_force_tool: llm_result = await self.llm.generate_response( message=self._build_force_tool_prompt(user_message=routing_message, user_id=user_id), tools=tools, ) if llm_result["tool_call"]: tool_name = llm_result["tool_call"]["name"] arguments = llm_result["tool_call"]["arguments"] try: tool_result = await self.registry.execute( tool_name, arguments, user_id=user_id, ) except HTTPException as exc: self._capture_review_confirmation_suggestion( tool_name=tool_name, arguments=arguments, exc=exc, user_id=user_id, ) return await finish(self._http_exception_detail(exc), queue_notice=queue_notice) if self._should_use_deterministic_response(tool_name): return await finish( self._fallback_format_tool_result(tool_name, tool_result), queue_notice=queue_notice, ) final_response = await self.llm.generate_response( message=self._build_result_prompt( user_message=routing_message, user_id=user_id, tool_name=tool_name, tool_result=tool_result, ), tools=[], ) text = (final_response.get("response") or "").strip() if self._is_low_value_response(text): return await finish( self._fallback_format_tool_result(tool_name, tool_result), queue_notice=queue_notice, ) return await finish( text or self._fallback_format_tool_result(tool_name, tool_result), queue_notice=queue_notice, ) text = (llm_result.get("response") or "").strip() if self._is_low_value_response(text): return await finish( "Entendi. Pode me dar mais detalhes para eu consultar corretamente?", queue_notice=queue_notice, ) return await finish(text, queue_notice=queue_notice) def _reset_pending_review_states(self, user_id: int | None) -> None: if user_id is None: return self.PENDING_REVIEW_DRAFTS.pop(user_id, None) self.PENDING_REVIEW_CONFIRMATIONS.pop(user_id, None) self.PENDING_REVIEW_MANAGEMENT_DRAFTS.pop(user_id, None) self.PENDING_REVIEW_REUSE_CONFIRMATIONS.pop(user_id, None) def _reset_pending_order_states(self, user_id: int | None) -> None: if user_id is None: return self.PENDING_ORDER_DRAFTS.pop(user_id, None) self.PENDING_CANCEL_ORDER_DRAFTS.pop(user_id, None) # Nessa função é onde eu configuro a memória volátil do sistema def _upsert_user_context(self, user_id: int | None) -> None: if user_id is None: return now = datetime.utcnow() context = self.USER_CONTEXTS.get(user_id) if context and context["expires_at"] >= now: context["expires_at"] = now + timedelta(minutes=USER_CONTEXT_TTL_MINUTES) return self.USER_CONTEXTS[user_id] = { "active_domain": "general", "generic_memory": {}, "shared_memory": {}, "order_queue": [], "pending_switch": None, "expires_at": now + timedelta(minutes=USER_CONTEXT_TTL_MINUTES), } def _get_user_context(self, user_id: int | None) -> dict | None: if user_id is None: return None context = self.USER_CONTEXTS.get(user_id) if not context: return None if context["expires_at"] < datetime.utcnow(): self.USER_CONTEXTS.pop(user_id, None) return None return context def _extract_generic_memory_fields(self, llm_generic_fields: dict | None = None) -> dict: extracted: dict = {} llm_fields = llm_generic_fields or {} normalized_plate = self._normalize_plate(llm_fields.get("placa")) if normalized_plate: extracted["placa"] = normalized_plate normalized_cpf = self._normalize_cpf(llm_fields.get("cpf")) if normalized_cpf: extracted["cpf"] = normalized_cpf normalized_budget = self._normalize_positive_number(llm_fields.get("orcamento_max")) if normalized_budget: extracted["orcamento_max"] = int(round(normalized_budget)) normalized_profile = self._normalize_vehicle_profile(llm_fields.get("perfil_veiculo")) if normalized_profile: extracted["perfil_veiculo"] = normalized_profile return extracted def _capture_generic_memory( self, user_id: int | None, llm_generic_fields: dict | None = None, ) -> None: context = self._get_user_context(user_id) if not context: return fields = self._extract_generic_memory_fields(llm_generic_fields=llm_generic_fields) if fields: # "Memoria generica" e um dict acumulado por usuario. # Campos novos entram e campos repetidos sobrescrevem valor antigo. context["generic_memory"].update(fields) context.setdefault("shared_memory", {}).update(fields) def _new_tab_memory(self, user_id: int | None) -> dict: context = self._get_user_context(user_id) if not context: return {} shared = context.get("shared_memory", {}) if not isinstance(shared, dict): return {} return dict(shared) def _empty_extraction_payload(self) -> dict: return { "generic_memory": {}, "review_fields": {}, "review_management_fields": {}, "order_fields": {}, "cancel_order_fields": {}, "intents": {}, } def _empty_message_plan(self, message: str) -> dict: return { "orders": [ { "domain": "general", "message": (message or "").strip(), "entities": self._empty_extraction_payload(), } ] } def _coerce_message_plan(self, payload, message: str) -> dict: default = self._empty_message_plan(message=message) if not isinstance(payload, dict): return default raw_orders = payload.get("orders") if not isinstance(raw_orders, list): return default normalized_orders: list[dict] = [] for item in raw_orders: if not isinstance(item, dict): continue domain = str(item.get("domain") or "general").strip().lower() if domain not in {"review", "sales", "general"}: domain = "general" segment = str(item.get("message") or "").strip() if not segment: continue normalized_orders.append( { "domain": domain, "message": segment, "entities": self._coerce_extraction_contract(item.get("entities")), } ) if not normalized_orders: return default return {"orders": normalized_orders} def _coerce_extraction_contract(self, payload) -> dict: if not isinstance(payload, dict): return self._empty_extraction_payload() contract = self._empty_extraction_payload() for key in contract: value = payload.get(key) contract[key] = value if isinstance(value, dict) else {} if key not in payload: logger.info("Extracao sem secao '%s'; usando vazio.", key) return contract async def _extract_message_plan_with_llm(self, message: str, user_id: int | None) -> dict: prompt = ( "Analise a mensagem e retorne APENAS JSON valido com roteamento e entidades por pedido.\n" "Sem markdown e sem texto extra.\n\n" "Formato:\n" "{\n" ' "orders": [\n' " {\n" ' "domain": "review|sales|general",\n' ' "message": "trecho literal do pedido",\n' ' "entities": {\n' ' "generic_memory": {"placa": null, "cpf": null, "orcamento_max": null, "perfil_veiculo": []},\n' ' "review_fields": {"placa": null, "data_hora": null, "modelo": null, "ano": null, "km": null, "revisao_previa_concessionaria": null},\n' ' "review_management_fields": {"protocolo": null, "nova_data_hora": null, "motivo": null},\n' ' "order_fields": {"cpf": null, "valor_veiculo": null},\n' ' "cancel_order_fields": {"numero_pedido": null, "motivo": null},\n' ' "intents": {"review_schedule": false, "review_list": false, "review_cancel": false, "review_reschedule": false, "order_create": false, "order_cancel": false}\n' " }\n" " }\n" " ]\n" "}\n\n" "Regras:\n" "- Se houver mais de um pedido operacional, separe em itens distintos em ordem de aparicao.\n" "- Se nao houver pedido operacional, use domain='general' com a mensagem inteira.\n" "- Mantenha cada message curta e fiel ao texto do usuario.\n\n" f"Contexto: user_id={user_id if user_id is not None else 'anonimo'}\n" f"Mensagem do usuario: {message}" ) default = self._empty_message_plan(message=message) try: result = await self.llm.generate_response(message=prompt, tools=[]) text = (result.get("response") or "").strip() payload = self._parse_json_object(text) if not isinstance(payload, dict): logger.warning("Plano de mensagem invalido (nao JSON objeto). user_id=%s", user_id) return default return self._coerce_message_plan(payload=payload, message=message) except Exception: logger.exception("Falha ao extrair plano da mensagem com LLM. user_id=%s", user_id) return default async def _extract_routing_with_llm(self, message: str, user_id: int | None) -> dict: plan = await self._extract_message_plan_with_llm(message=message, user_id=user_id) return { "orders": [ { "domain": item.get("domain", "general"), "message": item.get("message", ""), } for item in plan.get("orders", []) ] } async def _extract_entities_with_llm(self, message: str, user_id: int | None) -> dict: user_context = f"user_id={user_id}" if user_id is not None else "user_id=anonimo" prompt = ( "Extraia entidades da mensagem do usuario e retorne APENAS JSON valido.\n" "Nao use markdown, nao adicione texto antes/depois, nao invente dados ausentes.\n" "Se nao houver valor, use null ou lista vazia.\n\n" "Formato obrigatorio:\n" "{\n" ' "generic_memory": {\n' ' "placa": null,\n' ' "cpf": null,\n' ' "orcamento_max": null,\n' ' "perfil_veiculo": []\n' " },\n" ' "review_fields": {\n' ' "placa": null,\n' ' "data_hora": null,\n' ' "modelo": null,\n' ' "ano": null,\n' ' "km": null,\n' ' "revisao_previa_concessionaria": null\n' " },\n" ' "review_management_fields": {\n' ' "protocolo": null,\n' ' "nova_data_hora": null,\n' ' "motivo": null\n' " },\n" ' "order_fields": {\n' ' "cpf": null,\n' ' "valor_veiculo": null\n' " },\n" ' "cancel_order_fields": {\n' ' "numero_pedido": null,\n' ' "motivo": null\n' " },\n" ' "intents": {\n' ' "review_schedule": false,\n' ' "review_list": false,\n' ' "review_cancel": false,\n' ' "review_reschedule": false,\n' ' "order_create": false,\n' ' "order_cancel": false\n' " }\n" "}\n\n" f"Contexto: {user_context}\n" f"Mensagem do usuario: {message}" ) default = self._empty_extraction_payload() try: result = await self.llm.generate_response(message=prompt, tools=[]) text = (result.get("response") or "").strip() if not text: logger.warning("Extracao vazia do LLM. user_id=%s", user_id) return default payload = self._parse_json_object(text) if not isinstance(payload, dict): logger.warning("Extracao invalida (nao JSON objeto). user_id=%s", user_id) return default coerced = self._coerce_extraction_contract(payload) return { "generic_memory": self._normalize_generic_fields(coerced.get("generic_memory")), "review_fields": self._normalize_review_fields(coerced.get("review_fields")), "review_management_fields": self._normalize_review_management_fields(coerced.get("review_management_fields")), "order_fields": self._normalize_order_fields(coerced.get("order_fields")), "cancel_order_fields": self._normalize_cancel_order_fields(coerced.get("cancel_order_fields")), "intents": self._normalize_intents(coerced.get("intents")), } except Exception: logger.exception("Falha ao extrair entidades com LLM. user_id=%s", user_id) return default def _resolve_entities_for_message_plan(self, message_plan: dict, routed_message: str) -> dict: default = self._empty_extraction_payload() if not isinstance(message_plan, dict): return default target = (routed_message or "").strip() raw_orders = message_plan.get("orders") if not isinstance(raw_orders, list): return default for item in raw_orders: if not isinstance(item, dict): continue segment = str(item.get("message") or "").strip() if segment != target: continue entities = self._coerce_extraction_contract(item.get("entities")) return { "generic_memory": self._normalize_generic_fields(entities.get("generic_memory")), "review_fields": self._normalize_review_fields(entities.get("review_fields")), "review_management_fields": self._normalize_review_management_fields(entities.get("review_management_fields")), "order_fields": self._normalize_order_fields(entities.get("order_fields")), "cancel_order_fields": self._normalize_cancel_order_fields(entities.get("cancel_order_fields")), "intents": self._normalize_intents(entities.get("intents")), } return default def _has_useful_extraction(self, extraction: dict | None) -> bool: if not isinstance(extraction, dict): return False intents = self._normalize_intents(extraction.get("intents")) if any(intents.values()): return True return any( bool(extraction.get(key)) for key in ("generic_memory", "review_fields", "review_management_fields", "order_fields", "cancel_order_fields") ) def _parse_json_object(self, text: str): candidate = (text or "").strip() if not candidate: return None if candidate.startswith("```"): candidate = re.sub(r"^```(?:json)?\s*", "", candidate, flags=re.IGNORECASE) candidate = re.sub(r"\s*```$", "", candidate) try: return json.loads(candidate) except json.JSONDecodeError: match = re.search(r"\{.*\}", candidate, flags=re.DOTALL) if not match: logger.warning("Extracao sem JSON valido no texto retornado.") return None try: return json.loads(match.group(0)) except json.JSONDecodeError: logger.warning("Extracao com JSON invalido apos recorte.") return None def _normalize_plate(self, value) -> str | None: text = str(value or "").strip().upper() if not text: return None if re.fullmatch(r"[A-Z]{3}[0-9][A-Z0-9][0-9]{2}", text) or re.fullmatch(r"[A-Z]{3}[0-9]{4}", text): return text compact = re.sub(r"[^A-Z0-9]", "", text) if re.fullmatch(r"[A-Z]{3}[0-9][A-Z0-9][0-9]{2}", compact) or re.fullmatch(r"[A-Z]{3}[0-9]{4}", compact): return compact return None def _normalize_cpf(self, value) -> str | None: digits = re.sub(r"\D", "", str(value or "")) if len(digits) == 11: return digits return None def _normalize_positive_number(self, value) -> float | None: if value is None: return None if isinstance(value, (int, float)): number = float(value) return number if number > 0 else None text = self._normalize_text(str(value)) text = text.replace("r$", "").strip() multiplier = 1000 if "mil" in text else 1 text = text.replace("mil", "").strip() digits = re.sub(r"[^0-9,.\s]", "", text) if not digits: return None numeric = digits.replace(".", "").replace(" ", "").replace(",", ".") try: number = float(numeric) * multiplier return number if number > 0 else None except ValueError: return None def _normalize_vehicle_profile(self, value) -> list[str]: if value is None: return [] allowed = {"suv", "sedan", "hatch", "pickup"} items = value if isinstance(value, list) else [value] normalized: list[str] = [] for item in items: marker = self._normalize_text(str(item)).strip() if marker in allowed and marker not in normalized: normalized.append(marker) return normalized def _normalize_bool(self, value) -> bool | None: if isinstance(value, bool): return value lowered = self._normalize_text(str(value or "")).strip() if lowered in {"sim", "true", "1", "yes"}: return True if lowered in {"nao", "false", "0", "no"}: return False return None def _normalize_review_datetime_text(self, value) -> str | None: text = str(value or "").strip() if not text: return None # Mantem formatos absolutos que o handler ja sabe interpretar. absolute_patterns = ( r"^\d{1,2}[/-]\d{1,2}[/-]\d{4}\s+(?:as\s+)?\d{1,2}:\d{2}(?::\d{2})?$", r"^\d{4}[/-]\d{1,2}[/-]\d{1,2}\s+(?:as\s+)?\d{1,2}:\d{2}(?::\d{2})?$", r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}(?::\d{2})?(?:Z|[+-]\d{2}:\d{2})?$", ) if any(re.match(pattern, text, flags=re.IGNORECASE) for pattern in absolute_patterns): return text normalized = self._normalize_text(text) day_offset = None if "amanha" in normalized: day_offset = 1 elif "hoje" in normalized: day_offset = 0 if day_offset is None: return text time_match = re.search(r"\b([01]?\d|2[0-3]):([0-5]\d)\b", normalized) if not time_match: return text hour = int(time_match.group(1)) minute = int(time_match.group(2)) target_date = datetime.now() + timedelta(days=day_offset) return f"{target_date.strftime('%d/%m/%Y')} {hour:02d}:{minute:02d}" def _normalize_generic_fields(self, data) -> dict: if not isinstance(data, dict): return {} extracted: dict = {} plate = self._normalize_plate(data.get("placa")) if plate: extracted["placa"] = plate cpf = self._normalize_cpf(data.get("cpf")) if cpf: extracted["cpf"] = cpf budget = self._normalize_positive_number(data.get("orcamento_max")) if budget: extracted["orcamento_max"] = int(round(budget)) profile = self._normalize_vehicle_profile(data.get("perfil_veiculo")) if profile: extracted["perfil_veiculo"] = profile return extracted def _normalize_review_fields(self, data) -> dict: if not isinstance(data, dict): return {} extracted: dict = {} plate = self._normalize_plate(data.get("placa")) if plate: extracted["placa"] = plate date_time = self._normalize_review_datetime_text(data.get("data_hora")) if date_time: extracted["data_hora"] = date_time model = str(data.get("modelo") or "").strip(" ,.;") if model: extracted["modelo"] = model.title() year = self._normalize_positive_number(data.get("ano")) if year: year_int = int(round(year)) if 1900 <= year_int <= 2100: extracted["ano"] = year_int km = self._normalize_positive_number(data.get("km")) if km: extracted["km"] = int(round(km)) reviewed = self._normalize_bool(data.get("revisao_previa_concessionaria")) if reviewed is not None: extracted["revisao_previa_concessionaria"] = reviewed return extracted def _extract_review_protocol_from_text(self, text: str) -> str | None: match = re.search(r"\bREV-[A-Z0-9\-]+\b", str(text or "").upper()) if not match: return None return match.group(0) def _normalize_review_management_fields(self, data) -> dict: if not isinstance(data, dict): return {} extracted: dict = {} raw_protocol = ( data.get("protocolo") or data.get("numero_protocolo") or data.get("codigo") ) protocol = self._extract_review_protocol_from_text(str(raw_protocol or "")) if protocol: extracted["protocolo"] = protocol new_datetime = self._normalize_review_datetime_text(data.get("nova_data_hora")) if new_datetime: extracted["nova_data_hora"] = new_datetime reason = str(data.get("motivo") or "").strip(" .;") if reason: extracted["motivo"] = reason return extracted def _normalize_order_fields(self, data) -> dict: if not isinstance(data, dict): return {} extracted: dict = {} cpf = self._normalize_cpf(data.get("cpf")) if cpf: extracted["cpf"] = cpf value = self._normalize_positive_number(data.get("valor_veiculo")) if value: extracted["valor_veiculo"] = round(value, 2) return extracted def _normalize_cancel_order_fields(self, data) -> dict: if not isinstance(data, dict): return {} extracted: dict = {} order_number = str(data.get("numero_pedido") or "").strip().upper() if order_number and re.fullmatch(r"PED-[A-Z0-9\-]+", order_number): extracted["numero_pedido"] = order_number reason = str(data.get("motivo") or "").strip(" .;") if reason: extracted["motivo"] = reason return extracted def _normalize_intents(self, data) -> dict: if not isinstance(data, dict): data = {} return { "review_schedule": bool(self._normalize_bool(data.get("review_schedule"))), "review_list": bool(self._normalize_bool(data.get("review_list"))), "review_cancel": bool(self._normalize_bool(data.get("review_cancel"))), "review_reschedule": bool(self._normalize_bool(data.get("review_reschedule"))), "order_create": bool(self._normalize_bool(data.get("order_create"))), "order_cancel": bool(self._normalize_bool(data.get("order_cancel"))), } def _has_operational_intent(self, extracted_entities: dict | None) -> bool: if not isinstance(extracted_entities, dict): return False intents = self._normalize_intents(extracted_entities.get("intents")) if any(intents.values()): return True return any( bool(extracted_entities.get(key)) for key in ("review_fields", "review_management_fields", "order_fields", "cancel_order_fields") ) def _try_prefill_review_fields_from_memory(self, user_id: int | None, payload: dict) -> None: if user_id is None: return context = self._get_user_context(user_id) if not context: return memory = context.get("generic_memory", {}) if payload.get("placa") is None: plate = self._normalize_plate(memory.get("placa")) if plate: payload["placa"] = plate def _queue_order(self, user_id: int | None, domain: str, order_message: str) -> None: context = self._get_user_context(user_id) if not context: return if domain == "general": return queue = context.setdefault("order_queue", []) queue.append( { "domain": domain, "message": (order_message or "").strip(), "memory_seed": self._new_tab_memory(user_id=user_id), "created_at": datetime.utcnow().isoformat(), } ) def _pop_next_order(self, user_id: int | None) -> dict | None: context = self._get_user_context(user_id) if not context: return None queue = context.setdefault("order_queue", []) if not queue: return None return queue.pop(0) def _prepare_message_for_single_order( self, message: str, user_id: int | None, routing_plan: dict | None = None, ) -> tuple[str, str | None, str | None]: context = self._get_user_context(user_id) if not context: return message, None, None queue_notice = None active_domain = context.get("active_domain", "general") orders_raw = (routing_plan or {}).get("orders") if isinstance(routing_plan, dict) else None extracted_orders: list[dict] = [] if isinstance(orders_raw, list): for item in orders_raw: if not isinstance(item, dict): continue domain = str(item.get("domain") or "general").strip().lower() if domain not in {"review", "sales", "general"}: domain = "general" segment = str(item.get("message") or "").strip() if segment: extracted_orders.append({"domain": domain, "message": segment}) if not extracted_orders: extracted_orders = [{"domain": "general", "message": (message or "").strip()}] if len(extracted_orders) <= 1: inferred = extracted_orders[0]["domain"] if ( inferred != "general" and inferred != active_domain and self._has_open_flow(user_id=user_id, domain=active_domain) ): self._queue_order(user_id=user_id, domain=inferred, order_message=message) queue_hint = self._render_queue_notice(1) return ( message, None, ( f"{self._render_open_flow_prompt(user_id=user_id, domain=active_domain)}\n{queue_hint}" if queue_hint else self._render_open_flow_prompt(user_id=user_id, domain=active_domain) ), ) return message, None, None if self._has_open_flow(user_id=user_id, domain=active_domain): queued_count = 0 for queued in extracted_orders: if queued["domain"] != active_domain: self._queue_order(user_id=user_id, domain=queued["domain"], order_message=queued["message"]) queued_count += 1 queue_hint = self._render_queue_notice(queued_count) return ( message, None, ( f"{self._render_open_flow_prompt(user_id=user_id, domain=active_domain)}\n{queue_hint}" if queue_hint else self._render_open_flow_prompt(user_id=user_id, domain=active_domain) ), ) first = extracted_orders[0] queued_count = 0 for queued in extracted_orders[1:]: self._queue_order(user_id=user_id, domain=queued["domain"], order_message=queued["message"]) queued_count += 1 context["active_domain"] = first["domain"] queue_notice = self._render_queue_notice(queued_count) return first["message"], queue_notice, None def _compose_order_aware_response(self, response: str, user_id: int | None, queue_notice: str | None = None) -> str: lines = [] if queue_notice: lines.append(queue_notice) lines.append(response) return "\n".join(lines) def _render_queue_notice(self, queued_count: int) -> str | None: if queued_count <= 0: return None if queued_count == 1: return "Anotei mais 1 pedido e sigo nele quando voce disser 'continuar'." return f"Anotei mais {queued_count} pedidos e sigo neles conforme voce for dizendo 'continuar'." def _render_open_flow_prompt(self, user_id: int | None, domain: str) -> str: if domain == "review" and user_id is not None: draft = self.PENDING_REVIEW_DRAFTS.get(user_id) if draft: missing = [field for field in REVIEW_REQUIRED_FIELDS if field not in draft.get("payload", {})] if missing: return self._render_missing_review_fields_prompt(missing) management_draft = self.PENDING_REVIEW_MANAGEMENT_DRAFTS.get(user_id) if management_draft: action = management_draft.get("action", "cancel") payload = management_draft.get("payload", {}) if action == "reschedule": missing = [field for field in ("protocolo", "nova_data_hora") if field not in payload] if missing: return self._render_missing_review_reschedule_fields_prompt(missing) else: missing = [field for field in ("protocolo",) if field not in payload] if missing: return self._render_missing_review_cancel_fields_prompt(missing) pending = self.PENDING_REVIEW_CONFIRMATIONS.get(user_id) if pending: return "Antes de mudar de assunto, me confirme se podemos concluir seu agendamento de revisao." reuse_pending = self.PENDING_REVIEW_REUSE_CONFIRMATIONS.get(user_id) if reuse_pending: return self._render_review_reuse_question() if domain == "sales" and user_id is not None: draft = self.PENDING_ORDER_DRAFTS.get(user_id) if draft: missing = [field for field in ORDER_REQUIRED_FIELDS if field not in draft.get("payload", {})] if missing: return self._render_missing_order_fields_prompt(missing) cancel_draft = self.PENDING_CANCEL_ORDER_DRAFTS.get(user_id) if cancel_draft: missing = [field for field in CANCEL_ORDER_REQUIRED_FIELDS if field not in cancel_draft.get("payload", {})] if missing: return self._render_missing_cancel_order_fields_prompt(missing) return "Vamos concluir este assunto primeiro e ja sigo com o proximo em seguida." def _build_next_order_transition(self, domain: str) -> str: if domain == "sales": return "Agora, sobre a compra do veiculo:" if domain == "review": return "Agora, sobre o agendamento da revisao:" return "Agora, sobre o proximo assunto:" async def _maybe_auto_advance_next_order(self, base_response: str, user_id: int | None) -> str: context = self._get_user_context(user_id) if not context: return base_response if context.get("pending_switch"): return base_response active_domain = context.get("active_domain", "general") if self._has_open_flow(user_id=user_id, domain=active_domain): return base_response next_order = self._pop_next_order(user_id=user_id) if not next_order: return base_response context["pending_switch"] = { "source_domain": context.get("active_domain", "general"), "target_domain": next_order["domain"], "queued_message": next_order["message"], "memory_seed": dict(next_order.get("memory_seed") or self._new_tab_memory(user_id=user_id)), "expires_at": datetime.utcnow() + timedelta(minutes=15), } transition = self._build_next_order_transition(next_order["domain"]) return ( f"{base_response}\n\n" f"{transition}\n" "Tenho um proximo pedido na fila. Quando quiser, diga 'continuar' para eu seguir nele." ) def _domain_from_intents(self, intents: dict | None) -> str: normalized = self._normalize_intents(intents) review_score = ( int(normalized.get("review_schedule", False)) + int(normalized.get("review_list", False)) + int(normalized.get("review_cancel", False)) + int(normalized.get("review_reschedule", False)) ) sales_score = int(normalized.get("order_create", False)) + int(normalized.get("order_cancel", False)) if review_score > sales_score and review_score > 0: return "review" if sales_score > review_score and sales_score > 0: return "sales" return "general" def _is_context_switch_confirmation(self, message: str) -> bool: return self._is_affirmative_message(message) or self._is_negative_message(message) def _is_continue_queue_message(self, message: str) -> bool: normalized = self._normalize_text(message).strip() normalized = re.sub(r"[.!?,;:]+$", "", normalized) return normalized in {"continuar", "pode continuar", "seguir", "pode seguir", "proximo", "segue"} async def _try_continue_queued_order(self, message: str, user_id: int | None) -> str | None: context = self._get_user_context(user_id) if not context: return None pending_switch = context.get("pending_switch") if not isinstance(pending_switch, dict): return None if pending_switch.get("expires_at") and pending_switch["expires_at"] < datetime.utcnow(): context["pending_switch"] = None return None queued_message = str(pending_switch.get("queued_message") or "").strip() if not queued_message: return None if self._is_negative_message(message): context["pending_switch"] = None return "Tudo bem. Mantive o proximo pedido fora da fila por enquanto." if not (self._is_continue_queue_message(message) or self._is_affirmative_message(message)): return None target_domain = str(pending_switch.get("target_domain") or "general") memory_seed = dict(pending_switch.get("memory_seed") or {}) self._apply_domain_switch(user_id=user_id, target_domain=target_domain) refreshed = self._get_user_context(user_id) if refreshed is not None: refreshed["generic_memory"] = memory_seed transition = self._build_next_order_transition(target_domain) next_response = await self.handle_message(queued_message, user_id=user_id) return f"{transition}\n{next_response}" def _has_open_flow(self, user_id: int | None, domain: str) -> bool: if user_id is None: return False if domain == "review": return bool( self.PENDING_REVIEW_DRAFTS.get(user_id) or self.PENDING_REVIEW_CONFIRMATIONS.get(user_id) or self.PENDING_REVIEW_MANAGEMENT_DRAFTS.get(user_id) or self.PENDING_REVIEW_REUSE_CONFIRMATIONS.get(user_id) ) if domain == "sales": return bool( self.PENDING_ORDER_DRAFTS.get(user_id) or self.PENDING_CANCEL_ORDER_DRAFTS.get(user_id) ) return False def _apply_domain_switch(self, user_id: int | None, target_domain: str) -> None: context = self._get_user_context(user_id) if not context: return previous_domain = context.get("active_domain", "general") if previous_domain == "review": self._reset_pending_review_states(user_id=user_id) if previous_domain == "sales": self._reset_pending_order_states(user_id=user_id) context["active_domain"] = target_domain context["generic_memory"] = self._new_tab_memory(user_id=user_id) context["pending_switch"] = None def _handle_context_switch( self, message: str, user_id: int | None, target_domain_hint: str = "general", ) -> str | None: context = self._get_user_context(user_id) if not context: return None pending_switch = context.get("pending_switch") if pending_switch: if pending_switch["expires_at"] < datetime.utcnow(): context["pending_switch"] = None elif self._is_context_switch_confirmation(message): if self._is_affirmative_message(message): target_domain = pending_switch["target_domain"] self._apply_domain_switch(user_id=user_id, target_domain=target_domain) return self._render_context_switched_message(target_domain=target_domain) context["pending_switch"] = None return "Perfeito, vamos continuar no fluxo atual." current_domain = context.get("active_domain", "general") target_domain = target_domain_hint if target_domain == "general" or target_domain == current_domain: return None if not self._has_open_flow(user_id=user_id, domain=current_domain): return None context["pending_switch"] = { "source_domain": current_domain, "target_domain": target_domain, "expires_at": datetime.utcnow() + timedelta(minutes=15), } return self._render_context_switch_confirmation( source_domain=current_domain, target_domain=target_domain, ) def _update_active_domain(self, user_id: int | None, domain_hint: str = "general") -> None: context = self._get_user_context(user_id) if not context: return detected = domain_hint if detected != "general": context["active_domain"] = detected def _domain_label(self, domain: str) -> str: labels = { "review": "agendamento de revisao", "sales": "compra de veiculo", "general": "atendimento geral", } return labels.get(domain, "atendimento") def _render_context_switch_confirmation(self, source_domain: str, target_domain: str) -> str: return ( f"Entendi que voce quer sair de {self._domain_label(source_domain)} " f"e ir para {self._domain_label(target_domain)}. Tem certeza?" ) def _render_context_switched_message(self, target_domain: str) -> str: return f"Certo, contexto anterior encerrado. Vamos seguir com {self._domain_label(target_domain)}." def _build_context_summary(self, user_id: int | None) -> str: context = self._get_user_context(user_id) if not context: return "Contexto de conversa: sem contexto ativo." domain = context.get("active_domain", "general") memory = context.get("generic_memory", {}) order_queue = context.get("order_queue", []) summary = [f"Contexto de conversa ativo: {self._domain_label(domain)}."] if memory: summary.append(f"Memoria generica temporaria: {memory}.") if order_queue: summary.append(f"Fila de pedidos pendentes: {len(order_queue)}.") return " ".join(summary) def _should_use_deterministic_response(self, tool_name: str) -> bool: return tool_name in DETERMINISTIC_RESPONSE_TOOLS def _normalize_text(self, text: str) -> str: normalized = unicodedata.normalize("NFKD", text or "") ascii_text = normalized.encode("ascii", "ignore").decode("ascii") return ascii_text.lower() def _is_low_value_response(self, text: str) -> bool: return text.strip().lower() in LOW_VALUE_RESPONSES async def _try_handle_review_management( self, message: str, user_id: int | None, extracted_fields: dict | None = None, intents: dict | None = None, ) -> str | None: if user_id is None: return None normalized_intents = self._normalize_intents(intents) draft = self.PENDING_REVIEW_MANAGEMENT_DRAFTS.get(user_id) if draft and draft["expires_at"] < datetime.utcnow(): self.PENDING_REVIEW_MANAGEMENT_DRAFTS.pop(user_id, None) draft = None has_list_intent = normalized_intents.get("review_list", False) has_cancel_intent = normalized_intents.get("review_cancel", False) has_reschedule_intent = normalized_intents.get("review_reschedule", False) if has_list_intent: # Listagem e acao terminal; limpa rascunhos de revisao para evitar conflito de contexto. self._reset_pending_review_states(user_id=user_id) try: tool_result = await self.registry.execute( "listar_agendamentos_revisao", {"limite": 20}, user_id=user_id, ) except HTTPException as exc: return self._http_exception_detail(exc) return self._fallback_format_tool_result("listar_agendamentos_revisao", tool_result) if not has_cancel_intent and not has_reschedule_intent and draft is None: return None if draft is None: action = "reschedule" if has_reschedule_intent else "cancel" draft = { "action": action, "payload": {}, "expires_at": datetime.utcnow() + timedelta(minutes=PENDING_REVIEW_DRAFT_TTL_MINUTES), } else: if has_reschedule_intent: draft["action"] = "reschedule" elif has_cancel_intent: draft["action"] = "cancel" extracted = self._normalize_review_management_fields(extracted_fields) if "protocolo" not in extracted: inferred_protocol = self._extract_review_protocol_from_text(message) if inferred_protocol: extracted["protocolo"] = inferred_protocol action = draft.get("action", "cancel") if ( action == "cancel" and "motivo" not in extracted and draft["payload"].get("protocolo") and not has_cancel_intent ): free_text = str(message or "").strip() if free_text and len(free_text) >= 4 and not self._is_affirmative_message(free_text): extracted["motivo"] = free_text draft["payload"].update(extracted) draft["expires_at"] = datetime.utcnow() + timedelta(minutes=PENDING_REVIEW_DRAFT_TTL_MINUTES) self.PENDING_REVIEW_MANAGEMENT_DRAFTS[user_id] = draft if action == "reschedule": missing = [field for field in ("protocolo", "nova_data_hora") if field not in draft["payload"]] if missing: return self._render_missing_review_reschedule_fields_prompt(missing) try: tool_result = await self.registry.execute( "editar_data_revisao", { "protocolo": draft["payload"]["protocolo"], "nova_data_hora": draft["payload"]["nova_data_hora"], }, user_id=user_id, ) except HTTPException as exc: return self._http_exception_detail(exc) finally: self.PENDING_REVIEW_MANAGEMENT_DRAFTS.pop(user_id, None) return self._fallback_format_tool_result("editar_data_revisao", tool_result) missing = [field for field in ("protocolo",) if field not in draft["payload"]] if missing: return self._render_missing_review_cancel_fields_prompt(missing) try: tool_result = await self.registry.execute( "cancelar_agendamento_revisao", { "protocolo": draft["payload"]["protocolo"], "motivo": draft["payload"].get("motivo"), }, user_id=user_id, ) except HTTPException as exc: return self._http_exception_detail(exc) finally: self.PENDING_REVIEW_MANAGEMENT_DRAFTS.pop(user_id, None) return self._fallback_format_tool_result("cancelar_agendamento_revisao", tool_result) def _render_missing_review_fields_prompt(self, missing_fields: list[str]) -> str: labels = { "placa": "a placa do veiculo", "data_hora": "a data e hora desejada para a revisao", "modelo": "o modelo do veiculo", "ano": "o ano do veiculo", "km": "a quilometragem atual (km)", "revisao_previa_concessionaria": "se ja fez revisao na concessionaria (sim/nao)", } itens = [f"- {labels[field]}" for field in missing_fields] return "Para agendar sua revisao, preciso dos dados abaixo:\n" + "\n".join(itens) def _render_missing_review_cancel_fields_prompt(self, missing_fields: list[str]) -> str: labels = { "protocolo": "o protocolo da revisao (ex.: REV-20260310-ABC12345)", } itens = [f"- {labels[field]}" for field in missing_fields] return "Para cancelar o agendamento de revisao, preciso dos dados abaixo:\n" + "\n".join(itens) def _render_missing_review_reschedule_fields_prompt(self, missing_fields: list[str]) -> str: labels = { "protocolo": "o protocolo da revisao (ex.: REV-20260310-ABC12345)", "nova_data_hora": "a nova data e hora desejada para a revisao", } itens = [f"- {labels[field]}" for field in missing_fields] return "Para remarcar sua revisao, preciso dos dados abaixo:\n" + "\n".join(itens) def _render_review_reuse_question(self) -> str: return ( "Deseja usar os mesmos dados do ultimo veiculo e informar so a data/hora da revisao? " "(sim/nao)" ) def _store_last_review_package(self, user_id: int | None, payload: dict | None) -> None: if user_id is None or not isinstance(payload, dict): return package = { "placa": payload.get("placa"), "modelo": payload.get("modelo"), "ano": payload.get("ano"), "km": payload.get("km"), "revisao_previa_concessionaria": payload.get("revisao_previa_concessionaria"), } sanitized = {k: v for k, v in package.items() if v is not None} required = {"placa", "modelo", "ano", "km", "revisao_previa_concessionaria"} if not required.issubset(sanitized.keys()): return self.LAST_REVIEW_PACKAGES[user_id] = { "payload": sanitized, "expires_at": datetime.utcnow() + timedelta(minutes=LAST_REVIEW_PACKAGE_TTL_MINUTES), } def _get_last_review_package(self, user_id: int | None) -> dict | None: if user_id is None: return None cached = self.LAST_REVIEW_PACKAGES.get(user_id) if not cached: return None if cached["expires_at"] < datetime.utcnow(): self.LAST_REVIEW_PACKAGES.pop(user_id, None) return None payload = cached.get("payload") return dict(payload) if isinstance(payload, dict) else None def _is_valid_cpf(self, cpf: str) -> bool: digits = re.sub(r"\D", "", cpf or "") if len(digits) != 11: return False if digits == digits[0] * 11: return False numbers = [int(d) for d in digits] sum_first = sum(n * w for n, w in zip(numbers[:9], range(10, 1, -1))) first_digit = 11 - (sum_first % 11) first_digit = 0 if first_digit >= 10 else first_digit if first_digit != numbers[9]: return False sum_second = sum(n * w for n, w in zip(numbers[:10], range(11, 1, -1))) second_digit = 11 - (sum_second % 11) second_digit = 0 if second_digit >= 10 else second_digit return second_digit == numbers[10] def _try_prefill_order_value_from_memory(self, user_id: int | None, payload: dict) -> None: # So preenche quando o usuario ainda nao informou valor explicitamente no fluxo atual. if user_id is None or payload.get("valor_veiculo") is not None: return context = self._get_user_context(user_id) if not context: return memory = context.get("generic_memory", {}) budget = memory.get("orcamento_max") if isinstance(budget, (int, float)) and budget > 0: # Reaproveita o orcamento capturado anteriormente como valor base do pedido. payload["valor_veiculo"] = float(budget) def _render_missing_order_fields_prompt(self, missing_fields: list[str]) -> str: labels = { "cpf": "o CPF do cliente", "valor_veiculo": "o valor do veiculo (R$)", } itens = [f"- {labels[field]}" for field in missing_fields] return "Para realizar o pedido, preciso dos dados abaixo:\n" + "\n".join(itens) def _render_missing_cancel_order_fields_prompt(self, missing_fields: list[str]) -> str: labels = { "numero_pedido": "o numero do pedido (ex.: PED-20260305123456-ABC123)", "motivo": "o motivo do cancelamento", } itens = [f"- {labels[field]}" for field in missing_fields] return "Para cancelar o pedido, preciso dos dados abaixo:\n" + "\n".join(itens) # Em vez de tentar entender tudo de uma vez, o bot mantem um "estado" do que ja sabe e vai perguntando apenas o que falta (os "slots" vazios) ate que a tarefa possa ser completada. async def _try_collect_and_schedule_review( self, message: str, user_id: int | None, extracted_fields: dict | None = None, intents: dict | None = None, ) -> str | None: if user_id is None: return None normalized_intents = self._normalize_intents(intents) has_intent = normalized_intents.get("review_schedule", False) has_management_intent = ( normalized_intents.get("review_list", False) or normalized_intents.get("review_cancel", False) or normalized_intents.get("review_reschedule", False) ) # Nao inicia slot-filling quando a intencao atual nao e de agendamento. if has_management_intent: # Se o usuario mudou para gerenciamento de revisao, encerra # qualquer coleta pendente de novo agendamento. self.PENDING_REVIEW_DRAFTS.pop(user_id, None) self.PENDING_REVIEW_REUSE_CONFIRMATIONS.pop(user_id, None) return None # Reaproveita rascunho anterior do usuario, se ainda estiver valido. draft = self.PENDING_REVIEW_DRAFTS.get(user_id) if draft and draft["expires_at"] < datetime.utcnow(): self.PENDING_REVIEW_DRAFTS.pop(user_id, None) draft = None extracted = self._normalize_review_fields(extracted_fields) pending_reuse = self.PENDING_REVIEW_REUSE_CONFIRMATIONS.get(user_id) if pending_reuse and pending_reuse["expires_at"] < datetime.utcnow(): self.PENDING_REVIEW_REUSE_CONFIRMATIONS.pop(user_id, None) pending_reuse = None if pending_reuse: should_reuse = False if self._is_negative_message(message): self.PENDING_REVIEW_REUSE_CONFIRMATIONS.pop(user_id, None) pending_reuse = None elif self._is_affirmative_message(message) or "data_hora" in extracted: should_reuse = True else: return self._render_review_reuse_question() if should_reuse: seed_payload = dict(pending_reuse.get("payload") or {}) if draft is None: draft = { "payload": seed_payload, "expires_at": datetime.utcnow() + timedelta(minutes=PENDING_REVIEW_DRAFT_TTL_MINUTES), } else: for key, value in seed_payload.items(): draft["payload"].setdefault(key, value) self.PENDING_REVIEW_REUSE_CONFIRMATIONS.pop(user_id, None) pending_reuse = None if "data_hora" not in extracted: self.PENDING_REVIEW_DRAFTS[user_id] = draft return "Perfeito. Me informe apenas a data e hora desejada para a revisao." if has_intent and draft is None and not extracted: last_package = self._get_last_review_package(user_id=user_id) if last_package: self.PENDING_REVIEW_REUSE_CONFIRMATIONS[user_id] = { "payload": last_package, "expires_at": datetime.utcnow() + timedelta(minutes=PENDING_REVIEW_DRAFT_TTL_MINUTES), } return self._render_review_reuse_question() # Se houver rascunho de revisao, mas o usuario mudou para outra # intencao operacional (ex.: compra/estoque), descarta o rascunho. if ( draft and not has_intent and ( normalized_intents.get("order_create", False) or normalized_intents.get("order_cancel", False) ) and not extracted ): self.PENDING_REVIEW_DRAFTS.pop(user_id, None) return None # Sem intencao de revisao e sem rascunho aberto: nao interfere no fluxo normal. if not has_intent and draft is None: return None if draft is None: draft = {"payload": {}, "expires_at": datetime.utcnow() + timedelta(minutes=PENDING_REVIEW_DRAFT_TTL_MINUTES)} # Merge incremental: apenas atualiza os campos detectados na mensagem atual. draft["payload"].update(extracted) self._try_prefill_review_fields_from_memory(user_id=user_id, payload=draft["payload"]) # Se o usuario responder apenas "sim/nao" no follow-up, preenche o slot booleano. if ( "revisao_previa_concessionaria" not in draft["payload"] and draft["payload"] and not extracted ): if self._is_affirmative_message(message): draft["payload"]["revisao_previa_concessionaria"] = True elif self._is_negative_message(message): draft["payload"]["revisao_previa_concessionaria"] = False draft["expires_at"] = datetime.utcnow() + timedelta(minutes=PENDING_REVIEW_DRAFT_TTL_MINUTES) self.PENDING_REVIEW_DRAFTS[user_id] = draft # Enquanto faltar campo obrigatorio, responde de forma deterministica # (sem depender do LLM para lembrar contexto). missing = [field for field in REVIEW_REQUIRED_FIELDS if field not in draft["payload"]] if missing: return self._render_missing_review_fields_prompt(missing) try: # Com payload completo, executa a tool de agendamento. tool_result = await self.registry.execute( "agendar_revisao", draft["payload"], user_id=user_id, ) except HTTPException as exc: # Se houver conflito com sugestao de horario, salva para confirmar com "pode/sim". self._capture_review_confirmation_suggestion( tool_name="agendar_revisao", arguments=draft["payload"], exc=exc, user_id=user_id, ) return self._http_exception_detail(exc) finally: # Limpa o rascunho apos tentativa final para evitar estado sujo. self.PENDING_REVIEW_DRAFTS.pop(user_id, None) self._store_last_review_package(user_id=user_id, payload=draft["payload"]) return self._fallback_format_tool_result("agendar_revisao", tool_result) async def _try_collect_and_create_order( self, message: str, user_id: int | None, extracted_fields: dict | None = None, intents: dict | None = None, ) -> str | None: if user_id is None: return None normalized_intents = self._normalize_intents(intents) draft = self.PENDING_ORDER_DRAFTS.get(user_id) if draft and draft["expires_at"] < datetime.utcnow(): self.PENDING_ORDER_DRAFTS.pop(user_id, None) draft = None extracted = self._normalize_order_fields(extracted_fields) has_intent = normalized_intents.get("order_create", False) if ( draft and not has_intent and ( normalized_intents.get("review_schedule", False) or normalized_intents.get("review_list", False) or normalized_intents.get("review_cancel", False) or normalized_intents.get("review_reschedule", False) or normalized_intents.get("order_cancel", False) ) and not extracted ): self.PENDING_ORDER_DRAFTS.pop(user_id, None) return None if not has_intent and draft is None: return None if draft is None: draft = { "payload": {}, "expires_at": datetime.utcnow() + timedelta(minutes=PENDING_ORDER_DRAFT_TTL_MINUTES), } draft["payload"].update(extracted) self._try_prefill_order_value_from_memory(user_id=user_id, payload=draft["payload"]) cpf_value = draft["payload"].get("cpf") if cpf_value and not self._is_valid_cpf(str(cpf_value)): draft["payload"].pop("cpf", None) self.PENDING_ORDER_DRAFTS[user_id] = draft return "Para seguir com o pedido, preciso de um CPF valido com 11 digitos." valor = draft["payload"].get("valor_veiculo") if valor is not None: try: parsed = float(valor) if parsed <= 0: draft["payload"].pop("valor_veiculo", None) else: draft["payload"]["valor_veiculo"] = round(parsed, 2) except (TypeError, ValueError): draft["payload"].pop("valor_veiculo", None) draft["expires_at"] = datetime.utcnow() + timedelta(minutes=PENDING_ORDER_DRAFT_TTL_MINUTES) self.PENDING_ORDER_DRAFTS[user_id] = draft missing = [field for field in ORDER_REQUIRED_FIELDS if field not in draft["payload"]] if missing: return self._render_missing_order_fields_prompt(missing) try: tool_result = await self.registry.execute( "realizar_pedido", draft["payload"], user_id=user_id, ) except HTTPException as exc: return self._http_exception_detail(exc) finally: self.PENDING_ORDER_DRAFTS.pop(user_id, None) return self._fallback_format_tool_result("realizar_pedido", tool_result) async def _try_collect_and_cancel_order( self, message: str, user_id: int | None, extracted_fields: dict | None = None, intents: dict | None = None, ) -> str | None: if user_id is None: return None normalized_intents = self._normalize_intents(intents) draft = self.PENDING_CANCEL_ORDER_DRAFTS.get(user_id) if draft and draft["expires_at"] < datetime.utcnow(): self.PENDING_CANCEL_ORDER_DRAFTS.pop(user_id, None) draft = None extracted = self._normalize_cancel_order_fields(extracted_fields) has_intent = normalized_intents.get("order_cancel", False) if ( draft and not has_intent and ( normalized_intents.get("review_schedule", False) or normalized_intents.get("review_list", False) or normalized_intents.get("review_cancel", False) or normalized_intents.get("review_reschedule", False) or normalized_intents.get("order_create", False) ) and not extracted ): self.PENDING_CANCEL_ORDER_DRAFTS.pop(user_id, None) return None if not has_intent and draft is None: return None if draft is None: draft = { "payload": {}, "expires_at": datetime.utcnow() + timedelta(minutes=PENDING_CANCEL_ORDER_DRAFT_TTL_MINUTES), } if ( "motivo" not in extracted and draft["payload"].get("numero_pedido") and not has_intent ): free_text = (message or "").strip() if free_text and len(free_text) >= 4: extracted["motivo"] = free_text draft["payload"].update(extracted) draft["expires_at"] = datetime.utcnow() + timedelta(minutes=PENDING_CANCEL_ORDER_DRAFT_TTL_MINUTES) self.PENDING_CANCEL_ORDER_DRAFTS[user_id] = draft missing = [field for field in CANCEL_ORDER_REQUIRED_FIELDS if field not in draft["payload"]] if missing: return self._render_missing_cancel_order_fields_prompt(missing) try: tool_result = await self.registry.execute( "cancelar_pedido", draft["payload"], user_id=user_id, ) except HTTPException as exc: return self._http_exception_detail(exc) finally: self.PENDING_CANCEL_ORDER_DRAFTS.pop(user_id, None) return self._fallback_format_tool_result("cancelar_pedido", tool_result) def _is_affirmative_message(self, text: str) -> bool: normalized = self._normalize_text(text).strip() normalized = re.sub(r"[.!?,;:]+$", "", normalized) return normalized in {"sim", "pode", "ok", "confirmo", "aceito", "fechado", "pode sim", "tenho", "tenho sim"} def _is_negative_message(self, text: str) -> bool: normalized = self._normalize_text(text).strip() normalized = re.sub(r"[.!?,;:]+$", "", normalized) return ( normalized in {"nao", "nao quero", "prefiro outro", "outro horario"} or normalized.startswith("nao") ) def _extract_time_only(self, text: str) -> str | None: match = re.search(r"\b([01]?\d|2[0-3]):([0-5]\d)\b", text or "") if not match: return None return f"{int(match.group(1)):02d}:{match.group(2)}" def _merge_date_with_time(self, base_iso: str, new_time_hhmm: str) -> str | None: try: base_dt = datetime.fromisoformat((base_iso or "").replace("Z", "+00:00")) except ValueError: return None try: hour_text, minute_text = new_time_hhmm.split(":") merged = base_dt.replace(hour=int(hour_text), minute=int(minute_text), second=0, microsecond=0) return merged.isoformat() except Exception: return None def _capture_review_confirmation_suggestion( self, tool_name: str, arguments: dict, exc: HTTPException, user_id: int | None, ) -> None: if tool_name != "agendar_revisao" or user_id is None or exc.status_code != 409: return detail = exc.detail if isinstance(exc.detail, str) else "" match = re.search(r"ISO:\s*([^)]+)\)", detail) if not match: return suggested_iso = match.group(1).strip() payload = dict(arguments or {}) if not payload.get("placa"): return payload["data_hora"] = suggested_iso self.PENDING_REVIEW_CONFIRMATIONS[user_id] = { "payload": payload, "expires_at": datetime.utcnow() + timedelta(minutes=PENDING_REVIEW_TTL_MINUTES), } async def _try_confirm_pending_review( self, message: str, user_id: int | None, extracted_review_fields: dict | None = None, ) -> str | None: if user_id is None: return None pending = self.PENDING_REVIEW_CONFIRMATIONS.get(user_id) if not pending: return None time_only = self._extract_time_only(message) if self._is_negative_message(message) or time_only: # Se o usuario recusar a sugestao e informar novo horario, reaproveita # o payload pendente com a nova data/hora. extracted = self._normalize_review_fields(extracted_review_fields) new_data_hora = extracted.get("data_hora") if not new_data_hora and time_only: new_data_hora = self._merge_date_with_time(pending["payload"].get("data_hora", ""), time_only) if not new_data_hora: self.PENDING_REVIEW_CONFIRMATIONS.pop(user_id, None) return "Sem problema. Me informe a nova data e hora desejada para a revisao." payload = dict(pending["payload"]) payload["data_hora"] = new_data_hora try: tool_result = await self.registry.execute( "agendar_revisao", payload, user_id=user_id, ) except HTTPException as exc: self.PENDING_REVIEW_CONFIRMATIONS.pop(user_id, None) self._capture_review_confirmation_suggestion( tool_name="agendar_revisao", arguments=payload, exc=exc, user_id=user_id, ) return self._http_exception_detail(exc) self.PENDING_REVIEW_CONFIRMATIONS.pop(user_id, None) self._store_last_review_package(user_id=user_id, payload=payload) return self._fallback_format_tool_result("agendar_revisao", tool_result) if not self._is_affirmative_message(message): return None if pending["expires_at"] < datetime.utcnow(): self.PENDING_REVIEW_CONFIRMATIONS.pop(user_id, None) return None try: tool_result = await self.registry.execute( "agendar_revisao", pending["payload"], user_id=user_id, ) except HTTPException as exc: self.PENDING_REVIEW_CONFIRMATIONS.pop(user_id, None) return self._http_exception_detail(exc) self.PENDING_REVIEW_CONFIRMATIONS.pop(user_id, None) self._store_last_review_package(user_id=user_id, payload=pending.get("payload")) return self._fallback_format_tool_result("agendar_revisao", tool_result) def _build_router_prompt(self, user_message: str, user_id: int | None) -> str: user_context = f"Contexto de usuario autenticado: user_id={user_id}.\n" if user_id else "" conversation_context = self._build_context_summary(user_id=user_id) return ( "Voce e um assistente de concessionaria. " "Sempre que a solicitacao depender de dados operacionais (estoque, validacao de cliente, " "avaliacao de troca, agendamento de revisao, realizacao ou cancelamento de pedido), use a tool correta. " "Se faltar parametro obrigatorio para a tool, responda em texto pedindo apenas o que falta.\n\n" f"{user_context}" f"{conversation_context}\n" f"Mensagem do usuario: {user_message}" ) def _build_force_tool_prompt(self, user_message: str, user_id: int | None) -> str: user_context = f"Contexto de usuario autenticado: user_id={user_id}.\n" if user_id else "" conversation_context = self._build_context_summary(user_id=user_id) return ( "Reavalie a mensagem e priorize chamar tool se houver intencao operacional. " "Use texto apenas quando faltar dado obrigatorio.\n\n" f"{user_context}" f"{conversation_context}\n" f"Mensagem do usuario: {user_message}" ) def _build_result_prompt( self, user_message: str, user_id: int | None, tool_name: str, tool_result, ) -> str: user_context = f"Contexto de usuario autenticado: user_id={user_id}.\n" if user_id else "" conversation_context = self._build_context_summary(user_id=user_id) return ( "Responda ao usuario de forma objetiva usando o resultado da tool abaixo. " "Nao invente dados. Se a lista vier vazia, diga explicitamente que nao encontrou resultados. " "Retorne texto puro sem markdown, sem asteriscos, sem emojis e com linhas curtas.\n\n" f"{user_context}" f"{conversation_context}\n" f"Pergunta original: {user_message}\n" f"Tool executada: {tool_name}\n" f"Resultado da tool: {tool_result}" ) def _http_exception_detail(self, exc: HTTPException) -> str: detail = exc.detail if isinstance(detail, str): return detail return "Nao foi possivel concluir a operacao solicitada." def _format_datetime_for_chat(self, value: str) -> str: try: dt = datetime.fromisoformat((value or "").replace("Z", "+00:00")) return dt.strftime("%d/%m/%Y %H:%M") except Exception: return value or "N/A" def _format_currency_br(self, value) -> str: try: number = float(value) formatted = f"{number:,.2f}".replace(",", "X").replace(".", ",").replace("X", ".") return f"R$ {formatted}" except Exception: return "N/A" def _fallback_format_tool_result(self, tool_name: str, tool_result) -> str: if tool_name == "consultar_estoque" and isinstance(tool_result, list): if not tool_result: return "Nao encontrei nenhum veiculo com os criterios informados." linhas = [f"Encontrei {len(tool_result)} veiculo(s):"] for idx, item in enumerate(tool_result[:10], start=1): modelo = item.get("modelo", "N/A") categoria = item.get("categoria", "N/A") preco = self._format_currency_br(item.get("preco")) linhas.append(f"{idx}. {modelo} ({categoria}) - {preco}") restantes = len(tool_result) - 10 if restantes > 0: linhas.append(f"... e mais {restantes} veiculo(s).") return "\n".join(linhas) if tool_name == "cancelar_pedido" and isinstance(tool_result, dict): numero = tool_result.get("numero_pedido", "N/A") status = tool_result.get("status", "N/A") motivo = tool_result.get("motivo") linhas = [f"Pedido {numero} atualizado.", f"Status: {status}"] if motivo: linhas.append(f"Motivo: {motivo}") return "\n".join(linhas) if tool_name == "realizar_pedido" and isinstance(tool_result, dict): numero = tool_result.get("numero_pedido", "N/A") valor = self._format_currency_br(tool_result.get("valor_veiculo")) return f"Pedido criado com sucesso.\nNumero: {numero}\nValor: {valor}" if tool_name == "agendar_revisao" and isinstance(tool_result, dict): placa = tool_result.get("placa", "N/A") data_hora = self._format_datetime_for_chat(tool_result.get("data_hora", "N/A")) protocolo = tool_result.get("protocolo", "N/A") valor = tool_result.get("valor_revisao") if isinstance(valor, (int, float)): return ( "Revisao agendada com sucesso.\n" f"Protocolo: {protocolo}\n" f"Placa: {placa}\n" f"Data/Hora: {data_hora}\n" f"Valor estimado: {self._format_currency_br(valor)}" ) return ( "Revisao agendada com sucesso.\n" f"Protocolo: {protocolo}\n" f"Placa: {placa}\n" f"Data/Hora: {data_hora}" ) if tool_name == "listar_agendamentos_revisao" and isinstance(tool_result, list): if not tool_result: return "Nao encontrei agendamentos de revisao para sua conta." linhas = [f"Voce tem {len(tool_result)} agendamento(s):"] for idx, item in enumerate(tool_result[:12], start=1): protocolo = item.get("protocolo", "N/A") placa = item.get("placa", "N/A") data_hora = self._format_datetime_for_chat(item.get("data_hora", "N/A")) status = item.get("status", "N/A") linhas.append(f"{idx}) Protocolo: {protocolo}") linhas.append(f"Placa: {placa}") linhas.append(f"Data/Hora: {data_hora} | Status: {status}") if idx < min(len(tool_result), 12): linhas.append("") restantes = len(tool_result) - 12 if restantes > 0: if linhas and linhas[-1] != "": linhas.append("") linhas.append(f"... e mais {restantes} agendamento(s).") return "\n".join(linhas) if tool_name == "cancelar_agendamento_revisao" and isinstance(tool_result, dict): protocolo = tool_result.get("protocolo", "N/A") status = tool_result.get("status", "N/A") placa = tool_result.get("placa", "N/A") data_hora = self._format_datetime_for_chat(tool_result.get("data_hora", "N/A")) return ( "Agendamento atualizado.\n" f"Protocolo: {protocolo}\n" f"Placa: {placa}\n" f"Data/Hora: {data_hora}\n" f"Status: {status}" ) if tool_name == "editar_data_revisao" and isinstance(tool_result, dict): protocolo = tool_result.get("protocolo", "N/A") placa = tool_result.get("placa", "N/A") data_hora = self._format_datetime_for_chat(tool_result.get("data_hora", "N/A")) status = tool_result.get("status", "N/A") return ( "Agendamento remarcado com sucesso.\n" f"Protocolo: {protocolo}\n" f"Placa: {placa}\n" f"Nova data/hora: {data_hora}\n" f"Status: {status}" ) if tool_name == "validar_cliente_venda" and isinstance(tool_result, dict): aprovado = tool_result.get("aprovado") limite = self._format_currency_br(tool_result.get("limite_credito")) score = tool_result.get("score", "N/A") cpf = tool_result.get("cpf", "N/A") if aprovado: return ( "Cliente aprovado para financiamento.\n" f"CPF: {cpf}\n" f"Score: {score}\n" f"Limite: {limite}" ) return ( "Cliente nao aprovado para financiamento.\n" f"CPF: {cpf}\n" f"Score: {score}\n" f"Limite: {limite}" ) return "Operacao concluida com sucesso."