diff --git a/app/services/orchestrator_config.py b/app/services/orchestrator_config.py new file mode 100644 index 0000000..9204ea7 --- /dev/null +++ b/app/services/orchestrator_config.py @@ -0,0 +1,48 @@ +USER_CONTEXT_TTL_MINUTES = 60 + +PENDING_REVIEW_TTL_MINUTES = 30 +PENDING_REVIEW_DRAFT_TTL_MINUTES = 30 +PENDING_ORDER_DRAFT_TTL_MINUTES = 30 +PENDING_CANCEL_ORDER_DRAFT_TTL_MINUTES = 30 + +REVIEW_REQUIRED_FIELDS = ( + "placa", + "data_hora", + "modelo", + "ano", + "km", + "revisao_previa_concessionaria", +) + +ORDER_REQUIRED_FIELDS = ( + "cpf", + "valor_veiculo", +) + +CANCEL_ORDER_REQUIRED_FIELDS = ( + "numero_pedido", + "motivo", +) + +LOW_VALUE_RESPONSES = { + "certo.", + "certo", + "ok.", + "ok", + "entendi.", + "entendi", + "claro.", + "claro", +} + +DETERMINISTIC_RESPONSE_TOOLS = { + "consultar_estoque", + "validar_cliente_venda", + "avaliar_veiculo_troca", + "agendar_revisao", + "listar_agendamentos_revisao", + "cancelar_agendamento_revisao", + "editar_data_revisao", + "cancelar_pedido", + "realizar_pedido", +} diff --git a/app/services/orquestrador_service.py b/app/services/orquestrador_service.py index 2d702fd..2fbbc79 100644 --- a/app/services/orquestrador_service.py +++ b/app/services/orquestrador_service.py @@ -1,54 +1,39 @@ import re +import json +import logging import unicodedata from datetime import datetime, timedelta from fastapi import HTTPException from sqlalchemy.orm import Session +from app.services.orchestrator_config import ( + CANCEL_ORDER_REQUIRED_FIELDS, + DETERMINISTIC_RESPONSE_TOOLS, + LOW_VALUE_RESPONSES, + ORDER_REQUIRED_FIELDS, + PENDING_CANCEL_ORDER_DRAFT_TTL_MINUTES, + PENDING_ORDER_DRAFT_TTL_MINUTES, + PENDING_REVIEW_DRAFT_TTL_MINUTES, + PENDING_REVIEW_TTL_MINUTES, + REVIEW_REQUIRED_FIELDS, + USER_CONTEXT_TTL_MINUTES, +) from app.services.llm_service import LLMService from app.services.tool_registry import ToolRegistry +logger = logging.getLogger(__name__) + class OrquestradorService: USER_CONTEXTS: dict[int, dict] = {} - USER_CONTEXT_TTL_MINUTES = 60 # Memoria temporaria de confirmacao quando a API sugere novo horario (conflito 409). PENDING_REVIEW_CONFIRMATIONS: dict[int, dict] = {} - PENDING_REVIEW_TTL_MINUTES = 30 # Pode ser alterado por uma variavel de configuracao caso o sistema cresca # Rascunho por usuario para juntar dados de revisao enviados em mensagens separadas. PENDING_REVIEW_DRAFTS: dict[int, dict] = {} - PENDING_REVIEW_DRAFT_TTL_MINUTES = 30 - REVIEW_REQUIRED_FIELDS = ( - "placa", - "data_hora", - "modelo", - "ano", - "km", - "revisao_previa_concessionaria", - ) - - LOW_VALUE_RESPONSES = { - "certo.", - "certo", - "ok.", - "ok", - "entendi.", - "entendi", - "claro.", - "claro", - } - DETERMINISTIC_RESPONSE_TOOLS = { - "consultar_estoque", - "validar_cliente_venda", - "avaliar_veiculo_troca", - "agendar_revisao", - "listar_agendamentos_revisao", - "cancelar_agendamento_revisao", - "editar_data_revisao", - "cancelar_pedido", - "realizar_pedido", - } + PENDING_ORDER_DRAFTS: dict[int, dict] = {} + PENDING_CANCEL_ORDER_DRAFTS: dict[int, dict] = {} def __init__(self, db: Session): """Inicializa servicos de LLM e registro de tools para a sessao atual.""" @@ -69,32 +54,86 @@ class OrquestradorService: ) self._upsert_user_context(user_id=user_id) - self._capture_generic_memory(message=message, user_id=user_id) + + routing_plan = await self._extract_routing_with_llm(message=message, user_id=user_id) ( routing_message, queue_notice, queue_early_response, - ) = self._prepare_message_for_single_order(message=message, user_id=user_id) + ) = self._prepare_message_for_single_order( + message=message, + user_id=user_id, + routing_plan=routing_plan, + ) if queue_early_response: return await finish(queue_early_response, queue_notice=queue_notice) - context_switch_response = self._handle_context_switch(message=routing_message, user_id=user_id) + extracted_entities = await self._extract_entities_with_llm( + message=routing_message, + user_id=user_id, + ) + self._capture_generic_memory( + user_id=user_id, + llm_generic_fields=extracted_entities.get("generic_memory", {}), + ) + + domain_hint = self._domain_from_intents(extracted_entities.get("intents", {})) + context_switch_response = self._handle_context_switch( + message=routing_message, + user_id=user_id, + target_domain_hint=domain_hint, + ) if context_switch_response: return await finish(context_switch_response, queue_notice=queue_notice) - self._update_active_domain(message=routing_message, user_id=user_id) + self._update_active_domain(user_id=user_id, domain_hint=domain_hint) + + review_management_response = await self._try_handle_review_management( + message=routing_message, + user_id=user_id, + intents=extracted_entities.get("intents", {}), + ) + if review_management_response: + return await finish(review_management_response, queue_notice=queue_notice) # 1) Se houver sugestao pendente de horario e o usuario confirmou ("pode/sim"), # agenda direto no horario sugerido. - confirmation_response = await self._try_confirm_pending_review(message=routing_message, user_id=user_id) + confirmation_response = await self._try_confirm_pending_review( + message=routing_message, + user_id=user_id, + extracted_review_fields=extracted_entities.get("review_fields", {}), + ) if confirmation_response: return await finish(confirmation_response, queue_notice=queue_notice) # 2) Fluxo de coleta incremental de dados da revisao (slot filling). # Evita pedir tudo de novo quando o usuario responde em partes. - review_response = await self._try_collect_and_schedule_review(message=routing_message, user_id=user_id) + review_response = await self._try_collect_and_schedule_review( + message=routing_message, + user_id=user_id, + extracted_fields=extracted_entities.get("review_fields", {}), + intents=extracted_entities.get("intents", {}), + ) if review_response: return await finish(review_response, queue_notice=queue_notice) + # 3) Fluxo de coleta incremental para cancelamento de pedido. + cancel_order_response = await self._try_collect_and_cancel_order( + message=routing_message, + user_id=user_id, + extracted_fields=extracted_entities.get("cancel_order_fields", {}), + intents=extracted_entities.get("intents", {}), + ) + if cancel_order_response: + return await finish(cancel_order_response, queue_notice=queue_notice) + # 4) Fluxo de coleta incremental para realizacao de pedido. + order_response = await self._try_collect_and_create_order( + message=routing_message, + user_id=user_id, + extracted_fields=extracted_entities.get("order_fields", {}), + intents=extracted_entities.get("intents", {}), + ) + if order_response: + return await finish(order_response, queue_notice=queue_notice) tools = self.registry.get_tools() @@ -103,7 +142,7 @@ class OrquestradorService: tools=tools, ) - if not llm_result["tool_call"] and self._is_operational_query(routing_message): + if not llm_result["tool_call"] and self._has_operational_intent(extracted_entities): llm_result = await self.llm.generate_response( message=self._build_force_tool_prompt(user_message=routing_message, user_id=user_id), tools=tools, @@ -169,20 +208,28 @@ class OrquestradorService: self.PENDING_REVIEW_DRAFTS.pop(user_id, None) self.PENDING_REVIEW_CONFIRMATIONS.pop(user_id, None) + def _reset_pending_order_states(self, user_id: int | None) -> None: + if user_id is None: + return + self.PENDING_ORDER_DRAFTS.pop(user_id, None) + self.PENDING_CANCEL_ORDER_DRAFTS.pop(user_id, None) + + # Nessa função é onde eu configuro a memória volátil do sistema def _upsert_user_context(self, user_id: int | None) -> None: if user_id is None: return now = datetime.utcnow() context = self.USER_CONTEXTS.get(user_id) if context and context["expires_at"] >= now: - context["expires_at"] = now + timedelta(minutes=self.USER_CONTEXT_TTL_MINUTES) + context["expires_at"] = now + timedelta(minutes=USER_CONTEXT_TTL_MINUTES) return self.USER_CONTEXTS[user_id] = { "active_domain": "general", "generic_memory": {}, + "shared_memory": {}, "order_queue": [], "pending_switch": None, - "expires_at": now + timedelta(minutes=self.USER_CONTEXT_TTL_MINUTES), + "expires_at": now + timedelta(minutes=USER_CONTEXT_TTL_MINUTES), } def _get_user_context(self, user_id: int | None) -> dict | None: @@ -196,46 +243,392 @@ class OrquestradorService: return None return context - def _extract_generic_memory_fields(self, message: str) -> dict: + def _extract_generic_memory_fields(self, llm_generic_fields: dict | None = None) -> dict: extracted: dict = {} - text = message or "" - lowered = self._normalize_text(text) + llm_fields = llm_generic_fields or {} - plate_match = re.search( - r"\b([A-Za-z]{3}[0-9][A-Za-z0-9][0-9]{2}|[A-Za-z]{3}[0-9]{4})\b", - text, - ) - if plate_match: - extracted["placa"] = plate_match.group(1).upper() + normalized_plate = self._normalize_plate(llm_fields.get("placa")) + if normalized_plate: + extracted["placa"] = normalized_plate - budget_match = re.search( - r"\bate\s*(?:r\$)?\s*(\d{1,3}(?:[.\s]\d{3})+|\d+)\s*(?:mil)?\b", - lowered, - ) - if budget_match: - digits = re.sub(r"[.\s]", "", budget_match.group(1)) - if digits.isdigit(): - value = int(digits) - if "mil" in lowered[budget_match.start(): budget_match.end()]: - value *= 1000 - extracted["orcamento_max"] = value - - vehicle_profile = [] - for marker in ("suv", "sedan", "hatch", "pickup"): - if marker in lowered: - vehicle_profile.append(marker) - if vehicle_profile: - extracted["perfil_veiculo"] = vehicle_profile + normalized_cpf = self._normalize_cpf(llm_fields.get("cpf")) + if normalized_cpf: + extracted["cpf"] = normalized_cpf + + normalized_budget = self._normalize_positive_number(llm_fields.get("orcamento_max")) + if normalized_budget: + extracted["orcamento_max"] = int(round(normalized_budget)) + + normalized_profile = self._normalize_vehicle_profile(llm_fields.get("perfil_veiculo")) + if normalized_profile: + extracted["perfil_veiculo"] = normalized_profile return extracted - def _capture_generic_memory(self, message: str, user_id: int | None) -> None: + def _capture_generic_memory( + self, + user_id: int | None, + llm_generic_fields: dict | None = None, + ) -> None: context = self._get_user_context(user_id) if not context: return - fields = self._extract_generic_memory_fields(message) + fields = self._extract_generic_memory_fields(llm_generic_fields=llm_generic_fields) if fields: + # "Memoria generica" e um dict acumulado por usuario. + # Campos novos entram e campos repetidos sobrescrevem valor antigo. context["generic_memory"].update(fields) + context.setdefault("shared_memory", {}).update(fields) + + def _new_tab_memory(self, user_id: int | None) -> dict: + context = self._get_user_context(user_id) + if not context: + return {} + shared = context.get("shared_memory", {}) + if not isinstance(shared, dict): + return {} + return dict(shared) + + def _empty_extraction_payload(self) -> dict: + return { + "generic_memory": {}, + "review_fields": {}, + "order_fields": {}, + "cancel_order_fields": {}, + "intents": {}, + } + + def _coerce_extraction_contract(self, payload) -> dict: + if not isinstance(payload, dict): + return self._empty_extraction_payload() + contract = self._empty_extraction_payload() + for key in contract: + value = payload.get(key) + contract[key] = value if isinstance(value, dict) else {} + if key not in payload: + logger.info("Extracao sem secao '%s'; usando vazio.", key) + return contract + + async def _extract_routing_with_llm(self, message: str, user_id: int | None) -> dict: + prompt = ( + "Analise a mensagem e retorne APENAS JSON valido para roteamento multiassunto.\n" + "Sem markdown e sem texto extra.\n\n" + "Formato:\n" + "{\n" + ' "orders": [\n' + ' {"domain": "review|sales|general", "message": "trecho literal do pedido"}\n' + " ]\n" + "}\n\n" + "Regras:\n" + "- Se houver mais de um pedido operacional, separe em itens distintos em ordem de aparicao.\n" + "- Se nao houver pedido operacional, use domain='general' com a mensagem inteira.\n" + "- Mantenha cada message curta e fiel ao texto do usuario.\n\n" + f"Contexto: user_id={user_id if user_id is not None else 'anonimo'}\n" + f"Mensagem do usuario: {message}" + ) + default = {"orders": [{"domain": "general", "message": (message or "").strip()}]} + try: + result = await self.llm.generate_response(message=prompt, tools=[]) + text = (result.get("response") or "").strip() + payload = self._parse_json_object(text) + if not isinstance(payload, dict): + logger.warning("Roteamento invalido (nao JSON objeto). user_id=%s", user_id) + return default + orders = payload.get("orders") + if not isinstance(orders, list): + return default + normalized: list[dict] = [] + for item in orders: + if not isinstance(item, dict): + continue + domain = str(item.get("domain") or "general").strip().lower() + if domain not in {"review", "sales", "general"}: + domain = "general" + segment = str(item.get("message") or "").strip() + if segment: + normalized.append({"domain": domain, "message": segment}) + if not normalized: + return default + return {"orders": normalized} + except Exception: + logger.exception("Falha ao rotear multiassunto com LLM. user_id=%s", user_id) + return default + + async def _extract_entities_with_llm(self, message: str, user_id: int | None) -> dict: + user_context = f"user_id={user_id}" if user_id is not None else "user_id=anonimo" + prompt = ( + "Extraia entidades da mensagem do usuario e retorne APENAS JSON valido.\n" + "Nao use markdown, nao adicione texto antes/depois, nao invente dados ausentes.\n" + "Se nao houver valor, use null ou lista vazia.\n\n" + "Formato obrigatorio:\n" + "{\n" + ' "generic_memory": {\n' + ' "placa": null,\n' + ' "cpf": null,\n' + ' "orcamento_max": null,\n' + ' "perfil_veiculo": []\n' + " },\n" + ' "review_fields": {\n' + ' "placa": null,\n' + ' "data_hora": null,\n' + ' "modelo": null,\n' + ' "ano": null,\n' + ' "km": null,\n' + ' "revisao_previa_concessionaria": null\n' + " },\n" + ' "order_fields": {\n' + ' "cpf": null,\n' + ' "valor_veiculo": null\n' + " },\n" + ' "cancel_order_fields": {\n' + ' "numero_pedido": null,\n' + ' "motivo": null\n' + " },\n" + ' "intents": {\n' + ' "review_schedule": false,\n' + ' "review_list": false,\n' + ' "order_create": false,\n' + ' "order_cancel": false\n' + " }\n" + "}\n\n" + f"Contexto: {user_context}\n" + f"Mensagem do usuario: {message}" + ) + + default = self._empty_extraction_payload() + try: + result = await self.llm.generate_response(message=prompt, tools=[]) + text = (result.get("response") or "").strip() + if not text: + logger.warning("Extracao vazia do LLM. user_id=%s", user_id) + return default + payload = self._parse_json_object(text) + if not isinstance(payload, dict): + logger.warning("Extracao invalida (nao JSON objeto). user_id=%s", user_id) + return default + coerced = self._coerce_extraction_contract(payload) + return { + "generic_memory": self._normalize_generic_fields(coerced.get("generic_memory")), + "review_fields": self._normalize_review_fields(coerced.get("review_fields")), + "order_fields": self._normalize_order_fields(coerced.get("order_fields")), + "cancel_order_fields": self._normalize_cancel_order_fields(coerced.get("cancel_order_fields")), + "intents": self._normalize_intents(coerced.get("intents")), + } + except Exception: + logger.exception("Falha ao extrair entidades com LLM. user_id=%s", user_id) + return default + + def _parse_json_object(self, text: str): + candidate = (text or "").strip() + if not candidate: + return None + if candidate.startswith("```"): + candidate = re.sub(r"^```(?:json)?\s*", "", candidate, flags=re.IGNORECASE) + candidate = re.sub(r"\s*```$", "", candidate) + try: + return json.loads(candidate) + except json.JSONDecodeError: + match = re.search(r"\{.*\}", candidate, flags=re.DOTALL) + if not match: + logger.warning("Extracao sem JSON valido no texto retornado.") + return None + try: + return json.loads(match.group(0)) + except json.JSONDecodeError: + logger.warning("Extracao com JSON invalido apos recorte.") + return None + + def _normalize_plate(self, value) -> str | None: + text = str(value or "").strip().upper() + if not text: + return None + if re.fullmatch(r"[A-Z]{3}[0-9][A-Z0-9][0-9]{2}", text) or re.fullmatch(r"[A-Z]{3}[0-9]{4}", text): + return text + compact = re.sub(r"[^A-Z0-9]", "", text) + if re.fullmatch(r"[A-Z]{3}[0-9][A-Z0-9][0-9]{2}", compact) or re.fullmatch(r"[A-Z]{3}[0-9]{4}", compact): + return compact + return None + + def _normalize_cpf(self, value) -> str | None: + digits = re.sub(r"\D", "", str(value or "")) + if len(digits) == 11: + return digits + return None + + def _normalize_positive_number(self, value) -> float | None: + if value is None: + return None + if isinstance(value, (int, float)): + number = float(value) + return number if number > 0 else None + text = self._normalize_text(str(value)) + text = text.replace("r$", "").strip() + multiplier = 1000 if "mil" in text else 1 + text = text.replace("mil", "").strip() + digits = re.sub(r"[^0-9,.\s]", "", text) + if not digits: + return None + numeric = digits.replace(".", "").replace(" ", "").replace(",", ".") + try: + number = float(numeric) * multiplier + return number if number > 0 else None + except ValueError: + return None + + def _normalize_vehicle_profile(self, value) -> list[str]: + if value is None: + return [] + allowed = {"suv", "sedan", "hatch", "pickup"} + items = value if isinstance(value, list) else [value] + normalized: list[str] = [] + for item in items: + marker = self._normalize_text(str(item)).strip() + if marker in allowed and marker not in normalized: + normalized.append(marker) + return normalized + + def _normalize_bool(self, value) -> bool | None: + if isinstance(value, bool): + return value + lowered = self._normalize_text(str(value or "")).strip() + if lowered in {"sim", "true", "1", "yes"}: + return True + if lowered in {"nao", "false", "0", "no"}: + return False + return None + + def _normalize_review_datetime_text(self, value) -> str | None: + text = str(value or "").strip() + if not text: + return None + + # Mantem formatos absolutos que o handler ja sabe interpretar. + absolute_patterns = ( + r"^\d{1,2}[/-]\d{1,2}[/-]\d{4}\s+(?:as\s+)?\d{1,2}:\d{2}(?::\d{2})?$", + r"^\d{4}[/-]\d{1,2}[/-]\d{1,2}\s+(?:as\s+)?\d{1,2}:\d{2}(?::\d{2})?$", + r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}(?::\d{2})?(?:Z|[+-]\d{2}:\d{2})?$", + ) + if any(re.match(pattern, text, flags=re.IGNORECASE) for pattern in absolute_patterns): + return text + + normalized = self._normalize_text(text) + day_offset = None + if "amanha" in normalized: + day_offset = 1 + elif "hoje" in normalized: + day_offset = 0 + if day_offset is None: + return text + + time_match = re.search(r"\b([01]?\d|2[0-3]):([0-5]\d)\b", normalized) + if not time_match: + return text + + hour = int(time_match.group(1)) + minute = int(time_match.group(2)) + target_date = datetime.now() + timedelta(days=day_offset) + return f"{target_date.strftime('%d/%m/%Y')} {hour:02d}:{minute:02d}" + + def _normalize_generic_fields(self, data) -> dict: + if not isinstance(data, dict): + return {} + extracted: dict = {} + plate = self._normalize_plate(data.get("placa")) + if plate: + extracted["placa"] = plate + cpf = self._normalize_cpf(data.get("cpf")) + if cpf: + extracted["cpf"] = cpf + budget = self._normalize_positive_number(data.get("orcamento_max")) + if budget: + extracted["orcamento_max"] = int(round(budget)) + profile = self._normalize_vehicle_profile(data.get("perfil_veiculo")) + if profile: + extracted["perfil_veiculo"] = profile + return extracted + + def _normalize_review_fields(self, data) -> dict: + if not isinstance(data, dict): + return {} + extracted: dict = {} + plate = self._normalize_plate(data.get("placa")) + if plate: + extracted["placa"] = plate + date_time = self._normalize_review_datetime_text(data.get("data_hora")) + if date_time: + extracted["data_hora"] = date_time + model = str(data.get("modelo") or "").strip(" ,.;") + if model: + extracted["modelo"] = model.title() + year = self._normalize_positive_number(data.get("ano")) + if year: + year_int = int(round(year)) + if 1900 <= year_int <= 2100: + extracted["ano"] = year_int + km = self._normalize_positive_number(data.get("km")) + if km: + extracted["km"] = int(round(km)) + reviewed = self._normalize_bool(data.get("revisao_previa_concessionaria")) + if reviewed is not None: + extracted["revisao_previa_concessionaria"] = reviewed + return extracted + + def _normalize_order_fields(self, data) -> dict: + if not isinstance(data, dict): + return {} + extracted: dict = {} + cpf = self._normalize_cpf(data.get("cpf")) + if cpf: + extracted["cpf"] = cpf + value = self._normalize_positive_number(data.get("valor_veiculo")) + if value: + extracted["valor_veiculo"] = round(value, 2) + return extracted + + def _normalize_cancel_order_fields(self, data) -> dict: + if not isinstance(data, dict): + return {} + extracted: dict = {} + order_number = str(data.get("numero_pedido") or "").strip().upper() + if order_number and re.fullmatch(r"PED-[A-Z0-9\-]+", order_number): + extracted["numero_pedido"] = order_number + reason = str(data.get("motivo") or "").strip(" .;") + if reason: + extracted["motivo"] = reason + return extracted + + def _normalize_intents(self, data) -> dict: + if not isinstance(data, dict): + data = {} + return { + "review_schedule": bool(self._normalize_bool(data.get("review_schedule"))), + "review_list": bool(self._normalize_bool(data.get("review_list"))), + "order_create": bool(self._normalize_bool(data.get("order_create"))), + "order_cancel": bool(self._normalize_bool(data.get("order_cancel"))), + } + + def _has_operational_intent(self, extracted_entities: dict | None) -> bool: + if not isinstance(extracted_entities, dict): + return False + intents = self._normalize_intents(extracted_entities.get("intents")) + if any(intents.values()): + return True + return any( + bool(extracted_entities.get(key)) + for key in ("review_fields", "order_fields", "cancel_order_fields") + ) + + def _try_prefill_review_fields_from_memory(self, user_id: int | None, payload: dict) -> None: + if user_id is None: + return + context = self._get_user_context(user_id) + if not context: + return + memory = context.get("generic_memory", {}) + if payload.get("placa") is None: + plate = self._normalize_plate(memory.get("placa")) + if plate: + payload["placa"] = plate def _queue_order(self, user_id: int | None, domain: str, order_message: str) -> None: context = self._get_user_context(user_id) @@ -248,6 +641,7 @@ class OrquestradorService: { "domain": domain, "message": (order_message or "").strip(), + "memory_seed": self._new_tab_memory(user_id=user_id), "created_at": datetime.utcnow().isoformat(), } ) @@ -261,62 +655,12 @@ class OrquestradorService: return None return queue.pop(0) - def _extract_order_requests(self, message: str) -> list[dict]: - text = (message or "").strip() - if not text: - return [] - - lowered = self._normalize_text(text) - domain_patterns = { - "review": ( - r"\brevis[a-z]*\b", - r"\bagendar\b", - r"\bmarcar\b", - r"\bremarcar\b", - r"\bagendamento\b", - ), - "sales": ( - r"\bcomprar\b", - r"\bcompra\b", - r"\bcarro\b", - r"\bveiculo\b", - r"\bestoque\b", - r"\bpedido\b", - r"\bfinanci[a-z]*\b", - ), - } - - matches: list[tuple[int, str]] = [] - for domain, patterns in domain_patterns.items(): - for pattern in patterns: - for hit in re.finditer(pattern, lowered): - matches.append((hit.start(), domain)) - - if not matches: - inferred = self._infer_domain(text) - return [{"domain": inferred, "message": text}] if inferred != "general" else [] - - matches.sort(key=lambda item: item[0]) - transitions: list[tuple[int, str]] = [] - for position, domain in matches: - if not transitions or transitions[-1][1] != domain: - transitions.append((position, domain)) - - if len(transitions) <= 1: - inferred = self._infer_domain(text) - return [{"domain": inferred, "message": text}] if inferred != "general" else [] - - requests: list[dict] = [] - for idx, (start, domain) in enumerate(transitions): - segment_start = 0 if idx == 0 else start - segment_end = transitions[idx + 1][0] if idx + 1 < len(transitions) else len(text) - segment = text[segment_start:segment_end].strip(" ,;.") - if segment: - requests.append({"domain": domain, "message": segment}) - - return requests - - def _prepare_message_for_single_order(self, message: str, user_id: int | None) -> tuple[str, str | None, str | None]: + def _prepare_message_for_single_order( + self, + message: str, + user_id: int | None, + routing_plan: dict | None = None, + ) -> tuple[str, str | None, str | None]: context = self._get_user_context(user_id) if not context: return message, None, None @@ -324,9 +668,23 @@ class OrquestradorService: queue_notice = None active_domain = context.get("active_domain", "general") - extracted_orders = self._extract_order_requests(message) + orders_raw = (routing_plan or {}).get("orders") if isinstance(routing_plan, dict) else None + extracted_orders: list[dict] = [] + if isinstance(orders_raw, list): + for item in orders_raw: + if not isinstance(item, dict): + continue + domain = str(item.get("domain") or "general").strip().lower() + if domain not in {"review", "sales", "general"}: + domain = "general" + segment = str(item.get("message") or "").strip() + if segment: + extracted_orders.append({"domain": domain, "message": segment}) + if not extracted_orders: + extracted_orders = [{"domain": "general", "message": (message or "").strip()}] + if len(extracted_orders) <= 1: - inferred = self._infer_domain(message) + inferred = extracted_orders[0]["domain"] if ( inferred != "general" and inferred != active_domain @@ -369,13 +727,24 @@ class OrquestradorService: if domain == "review" and user_id is not None: draft = self.PENDING_REVIEW_DRAFTS.get(user_id) if draft: - missing = [field for field in self.REVIEW_REQUIRED_FIELDS if field not in draft.get("payload", {})] + missing = [field for field in REVIEW_REQUIRED_FIELDS if field not in draft.get("payload", {})] if missing: return self._render_missing_review_fields_prompt(missing) pending = self.PENDING_REVIEW_CONFIRMATIONS.get(user_id) if pending: return "Antes de mudar de assunto, me confirme se podemos concluir seu agendamento de revisao." + if domain == "sales" and user_id is not None: + draft = self.PENDING_ORDER_DRAFTS.get(user_id) + if draft: + missing = [field for field in ORDER_REQUIRED_FIELDS if field not in draft.get("payload", {})] + if missing: + return self._render_missing_order_fields_prompt(missing) + cancel_draft = self.PENDING_CANCEL_ORDER_DRAFTS.get(user_id) + if cancel_draft: + missing = [field for field in CANCEL_ORDER_REQUIRED_FIELDS if field not in cancel_draft.get("payload", {})] + if missing: + return self._render_missing_cancel_order_fields_prompt(missing) return "Vamos concluir este assunto primeiro e ja sigo com o proximo em seguida." @@ -403,36 +772,19 @@ class OrquestradorService: return base_response context["active_domain"] = next_order["domain"] + context["generic_memory"] = dict(next_order.get("memory_seed") or self._new_tab_memory(user_id=user_id)) + context["pending_switch"] = None next_response = await self.handle_message(next_order["message"], user_id=user_id) transition = self._build_next_order_transition(next_order["domain"]) return f"{base_response}\n\n{transition}\n{next_response}" - def _infer_domain(self, message: str) -> str: - lowered = self._normalize_text(message) - - review_keywords = ( - "revisao", - "agendamento", - "agendar", - "remarcar", - "cancelar agendamento", - "placa", - ) - sales_keywords = ( - "comprar", - "compra", - "veiculo", - "carro", - "estoque", - "financi", - "pedido", - ) - - has_review = any(k in lowered for k in review_keywords) - has_sales = any(k in lowered for k in sales_keywords) - if has_review and not has_sales: + def _domain_from_intents(self, intents: dict | None) -> str: + normalized = self._normalize_intents(intents) + review_score = int(normalized.get("review_schedule", False)) + int(normalized.get("review_list", False)) + sales_score = int(normalized.get("order_create", False)) + int(normalized.get("order_cancel", False)) + if review_score > sales_score and review_score > 0: return "review" - if has_sales and not has_review: + if sales_score > review_score and sales_score > 0: return "sales" return "general" @@ -447,6 +799,11 @@ class OrquestradorService: self.PENDING_REVIEW_DRAFTS.get(user_id) or self.PENDING_REVIEW_CONFIRMATIONS.get(user_id) ) + if domain == "sales": + return bool( + self.PENDING_ORDER_DRAFTS.get(user_id) + or self.PENDING_CANCEL_ORDER_DRAFTS.get(user_id) + ) return False def _apply_domain_switch(self, user_id: int | None, target_domain: str) -> None: @@ -456,10 +813,18 @@ class OrquestradorService: previous_domain = context.get("active_domain", "general") if previous_domain == "review": self._reset_pending_review_states(user_id=user_id) + if previous_domain == "sales": + self._reset_pending_order_states(user_id=user_id) context["active_domain"] = target_domain + context["generic_memory"] = self._new_tab_memory(user_id=user_id) context["pending_switch"] = None - def _handle_context_switch(self, message: str, user_id: int | None) -> str | None: + def _handle_context_switch( + self, + message: str, + user_id: int | None, + target_domain_hint: str = "general", + ) -> str | None: context = self._get_user_context(user_id) if not context: return None @@ -477,7 +842,7 @@ class OrquestradorService: return "Perfeito, vamos continuar no fluxo atual." current_domain = context.get("active_domain", "general") - target_domain = self._infer_domain(message) + target_domain = target_domain_hint if target_domain == "general" or target_domain == current_domain: return None if not self._has_open_flow(user_id=user_id, domain=current_domain): @@ -493,11 +858,11 @@ class OrquestradorService: target_domain=target_domain, ) - def _update_active_domain(self, message: str, user_id: int | None) -> None: + def _update_active_domain(self, user_id: int | None, domain_hint: str = "general") -> None: context = self._get_user_context(user_id) if not context: return - detected = self._infer_domain(message) + detected = domain_hint if detected != "general": context["active_domain"] = detected @@ -534,7 +899,7 @@ class OrquestradorService: return " ".join(summary) def _should_use_deterministic_response(self, tool_name: str) -> bool: - return tool_name in self.DETERMINISTIC_RESPONSE_TOOLS + return tool_name in DETERMINISTIC_RESPONSE_TOOLS def _normalize_text(self, text: str) -> str: normalized = unicodedata.normalize("NFKD", text or "") @@ -542,119 +907,31 @@ class OrquestradorService: return ascii_text.lower() def _is_low_value_response(self, text: str) -> bool: - return text.strip().lower() in self.LOW_VALUE_RESPONSES - - def _is_review_scheduling_intent(self, text: str) -> bool: - lowered = self._normalize_text(text) - scheduling_keywords = ( - "agendar", - "marcar revis", - "marcar manutenc", - "nova revis", - "quero agendar", - "quero marcar", - ) - return any(k in lowered for k in scheduling_keywords) - - def _is_review_management_intent(self, text: str) -> bool: - lowered = (text or "").lower() - management_keywords = ( - "agendamento", - "agendamentos", - "meus agendamentos", - "listar", - "mostrar", - "ver", - "cancelar revis", - "cancelar agendamento", - "remarcar", - "editar data", - "alterar data", - ) - return any(k in lowered for k in management_keywords) - - def _extract_review_fields(self, text: str) -> dict: - # Extrai os campos de revisao com regex simples para reduzir dependencia do LLM - # em mensagens curtas de follow-up. - lowered = self._normalize_text(text) - extracted: dict = {} + return text.strip().lower() in LOW_VALUE_RESPONSES - placa_match = re.search(r"\b([A-Za-z]{3}[0-9][A-Za-z0-9][0-9]{2}|[A-Za-z]{3}[0-9]{4})\b", text or "") - if placa_match: - extracted["placa"] = placa_match.group(1).upper() + async def _try_handle_review_management( + self, + message: str, + user_id: int | None, + intents: dict | None = None, + ) -> str | None: + if user_id is None: + return None + normalized_intents = self._normalize_intents(intents) + if not normalized_intents.get("review_list", False): + return None - dt_match = re.search( - r"(\d{1,2}[/-]\d{1,2}[/-]\d{4}\s*(?:as)?\s*\d{1,2}:\d{2})|" - r"(\d{4}[/-]\d{1,2}[/-]\d{1,2}\s*(?:as)?\s*\d{1,2}:\d{2})|" - r"(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}(?::\d{2})?(?:Z|[+-]\d{2}:\d{2})?)", - lowered, - ) - if dt_match: - value = next((g for g in dt_match.groups() if g), None) - if value: - extracted["data_hora"] = re.sub(r"\s+as\s+", " as ", value, flags=re.IGNORECASE) - else: - day_ref = None - if re.search(r"\bhoje\b", lowered): - day_ref = "hoje" - elif re.search(r"\bamanh[a-z]?\b", lowered): - day_ref = "amanha" - - if day_ref: - time_match = re.search(r"\b(?:as\s*)?([01]?\d|2[0-3])(?::([0-5]\d))?\b", lowered) - if time_match: - hour = int(time_match.group(1)) - minute = int(time_match.group(2) or "00") - target_date = datetime.now() - if day_ref == "amanha": - target_date = target_date + timedelta(days=1) - extracted["data_hora"] = f"{target_date.strftime('%d/%m/%Y')} {hour:02d}:{minute:02d}" - - modelo_match = re.search( - r"modelo\s+([a-z0-9][a-z0-9\s\-]{1,40}?)(?=\s*(?:,|ano\b|\d{1,3}(?:[.\s]\d{3})*\s*km\b|$))", - lowered, - ) - if modelo_match: - modelo = modelo_match.group(1).strip(" ,.;") - if modelo: - extracted["modelo"] = modelo.title() - - ano_match = re.search(r"\bano\s*(?:de\s*)?(19\d{2}|20\d{2})\b", lowered) - if not ano_match: - # Fallback sem a palavra "ano", evitando capturar o ano de uma data (ex.: 10/03/2026). - ano_match = re.search(r"(? str: labels = { @@ -668,14 +945,73 @@ class OrquestradorService: itens = [f"- {labels[field]}" for field in missing_fields] return "Para agendar sua revisao, preciso dos dados abaixo:\n" + "\n".join(itens) + def _is_valid_cpf(self, cpf: str) -> bool: + digits = re.sub(r"\D", "", cpf or "") + if len(digits) != 11: + return False + if digits == digits[0] * 11: + return False + + numbers = [int(d) for d in digits] + + sum_first = sum(n * w for n, w in zip(numbers[:9], range(10, 1, -1))) + first_digit = 11 - (sum_first % 11) + first_digit = 0 if first_digit >= 10 else first_digit + if first_digit != numbers[9]: + return False + + sum_second = sum(n * w for n, w in zip(numbers[:10], range(11, 1, -1))) + second_digit = 11 - (sum_second % 11) + second_digit = 0 if second_digit >= 10 else second_digit + return second_digit == numbers[10] + + def _try_prefill_order_value_from_memory(self, user_id: int | None, payload: dict) -> None: + # So preenche quando o usuario ainda nao informou valor explicitamente no fluxo atual. + if user_id is None or payload.get("valor_veiculo") is not None: + return + + context = self._get_user_context(user_id) + if not context: + return + memory = context.get("generic_memory", {}) + budget = memory.get("orcamento_max") + if isinstance(budget, (int, float)) and budget > 0: + # Reaproveita o orcamento capturado anteriormente como valor base do pedido. + payload["valor_veiculo"] = float(budget) + + def _render_missing_order_fields_prompt(self, missing_fields: list[str]) -> str: + labels = { + "cpf": "o CPF do cliente", + "valor_veiculo": "o valor do veiculo (R$)", + } + itens = [f"- {labels[field]}" for field in missing_fields] + return "Para realizar o pedido, preciso dos dados abaixo:\n" + "\n".join(itens) + + def _render_missing_cancel_order_fields_prompt(self, missing_fields: list[str]) -> str: + labels = { + "numero_pedido": "o numero do pedido (ex.: PED-20260305123456-ABC123)", + "motivo": "o motivo do cancelamento", + } + itens = [f"- {labels[field]}" for field in missing_fields] + return "Para cancelar o pedido, preciso dos dados abaixo:\n" + "\n".join(itens) + # Em vez de tentar entender tudo de uma vez, o bot mantem um "estado" do que ja sabe e vai perguntando apenas o que falta (os "slots" vazios) ate que a tarefa possa ser completada. - async def _try_collect_and_schedule_review(self, message: str, user_id: int | None) -> str | None: + async def _try_collect_and_schedule_review( + self, + message: str, + user_id: int | None, + extracted_fields: dict | None = None, + intents: dict | None = None, + ) -> str | None: if user_id is None: return None - # Nao inicia slot-filling para fluxos de listar/cancelar/remarcar revisao. - # Nesses casos o roteamento via LLM + tools deve seguir normalmente. - if self._is_review_management_intent(message): + normalized_intents = self._normalize_intents(intents) + has_intent = normalized_intents.get("review_schedule", False) + has_management_intent = normalized_intents.get("review_list", False) + + # Nao inicia slot-filling quando a intencao atual nao e de agendamento. + if has_management_intent: # Se o usuario mudou para gerenciamento de revisao, encerra # qualquer coleta pendente de novo agendamento. self.PENDING_REVIEW_DRAFTS.pop(user_id, None) @@ -687,12 +1023,19 @@ class OrquestradorService: self.PENDING_REVIEW_DRAFTS.pop(user_id, None) draft = None - extracted = self._extract_review_fields(message) - has_intent = self._is_review_scheduling_intent(message) + extracted = self._normalize_review_fields(extracted_fields) # Se houver rascunho de revisao, mas o usuario mudou para outra # intencao operacional (ex.: compra/estoque), descarta o rascunho. - if draft and not has_intent and self._is_operational_query(message): + if ( + draft + and not has_intent + and ( + normalized_intents.get("order_create", False) + or normalized_intents.get("order_cancel", False) + ) + and not extracted + ): self.PENDING_REVIEW_DRAFTS.pop(user_id, None) return None @@ -701,15 +1044,11 @@ class OrquestradorService: return None if draft is None: - draft = {"payload": {}, "expires_at": datetime.utcnow() + timedelta(minutes=self.PENDING_REVIEW_DRAFT_TTL_MINUTES)} - - # Permite o usuario "abortar" a coleta atual. - if "cancelar" in (message or "").lower() and draft["payload"]: - self.PENDING_REVIEW_DRAFTS.pop(user_id, None) - return None + draft = {"payload": {}, "expires_at": datetime.utcnow() + timedelta(minutes=PENDING_REVIEW_DRAFT_TTL_MINUTES)} # Merge incremental: apenas atualiza os campos detectados na mensagem atual. draft["payload"].update(extracted) + self._try_prefill_review_fields_from_memory(user_id=user_id, payload=draft["payload"]) # Se o usuario responder apenas "sim/nao" no follow-up, preenche o slot booleano. if ( "revisao_previa_concessionaria" not in draft["payload"] @@ -720,12 +1059,12 @@ class OrquestradorService: draft["payload"]["revisao_previa_concessionaria"] = True elif self._is_negative_message(message): draft["payload"]["revisao_previa_concessionaria"] = False - draft["expires_at"] = datetime.utcnow() + timedelta(minutes=self.PENDING_REVIEW_DRAFT_TTL_MINUTES) + draft["expires_at"] = datetime.utcnow() + timedelta(minutes=PENDING_REVIEW_DRAFT_TTL_MINUTES) self.PENDING_REVIEW_DRAFTS[user_id] = draft # Enquanto faltar campo obrigatorio, responde de forma deterministica # (sem depender do LLM para lembrar contexto). - missing = [field for field in self.REVIEW_REQUIRED_FIELDS if field not in draft["payload"]] + missing = [field for field in REVIEW_REQUIRED_FIELDS if field not in draft["payload"]] if missing: return self._render_missing_review_fields_prompt(missing) @@ -751,6 +1090,158 @@ class OrquestradorService: return self._fallback_format_tool_result("agendar_revisao", tool_result) + async def _try_collect_and_create_order( + self, + message: str, + user_id: int | None, + extracted_fields: dict | None = None, + intents: dict | None = None, + ) -> str | None: + if user_id is None: + return None + + normalized_intents = self._normalize_intents(intents) + draft = self.PENDING_ORDER_DRAFTS.get(user_id) + if draft and draft["expires_at"] < datetime.utcnow(): + self.PENDING_ORDER_DRAFTS.pop(user_id, None) + draft = None + + extracted = self._normalize_order_fields(extracted_fields) + has_intent = normalized_intents.get("order_create", False) + + if ( + draft + and not has_intent + and ( + normalized_intents.get("review_schedule", False) + or normalized_intents.get("review_list", False) + or normalized_intents.get("order_cancel", False) + ) + and not extracted + ): + self.PENDING_ORDER_DRAFTS.pop(user_id, None) + return None + + if not has_intent and draft is None: + return None + + if draft is None: + draft = { + "payload": {}, + "expires_at": datetime.utcnow() + timedelta(minutes=PENDING_ORDER_DRAFT_TTL_MINUTES), + } + + draft["payload"].update(extracted) + self._try_prefill_order_value_from_memory(user_id=user_id, payload=draft["payload"]) + + cpf_value = draft["payload"].get("cpf") + if cpf_value and not self._is_valid_cpf(str(cpf_value)): + draft["payload"].pop("cpf", None) + self.PENDING_ORDER_DRAFTS[user_id] = draft + return "Para seguir com o pedido, preciso de um CPF valido com 11 digitos." + + valor = draft["payload"].get("valor_veiculo") + if valor is not None: + try: + parsed = float(valor) + if parsed <= 0: + draft["payload"].pop("valor_veiculo", None) + else: + draft["payload"]["valor_veiculo"] = round(parsed, 2) + except (TypeError, ValueError): + draft["payload"].pop("valor_veiculo", None) + + draft["expires_at"] = datetime.utcnow() + timedelta(minutes=PENDING_ORDER_DRAFT_TTL_MINUTES) + self.PENDING_ORDER_DRAFTS[user_id] = draft + + missing = [field for field in ORDER_REQUIRED_FIELDS if field not in draft["payload"]] + if missing: + return self._render_missing_order_fields_prompt(missing) + + try: + tool_result = await self.registry.execute( + "realizar_pedido", + draft["payload"], + user_id=user_id, + ) + except HTTPException as exc: + return self._http_exception_detail(exc) + finally: + self.PENDING_ORDER_DRAFTS.pop(user_id, None) + + return self._fallback_format_tool_result("realizar_pedido", tool_result) + + async def _try_collect_and_cancel_order( + self, + message: str, + user_id: int | None, + extracted_fields: dict | None = None, + intents: dict | None = None, + ) -> str | None: + if user_id is None: + return None + + normalized_intents = self._normalize_intents(intents) + draft = self.PENDING_CANCEL_ORDER_DRAFTS.get(user_id) + if draft and draft["expires_at"] < datetime.utcnow(): + self.PENDING_CANCEL_ORDER_DRAFTS.pop(user_id, None) + draft = None + + extracted = self._normalize_cancel_order_fields(extracted_fields) + has_intent = normalized_intents.get("order_cancel", False) + + if ( + draft + and not has_intent + and ( + normalized_intents.get("review_schedule", False) + or normalized_intents.get("review_list", False) + or normalized_intents.get("order_create", False) + ) + and not extracted + ): + self.PENDING_CANCEL_ORDER_DRAFTS.pop(user_id, None) + return None + + if not has_intent and draft is None: + return None + + if draft is None: + draft = { + "payload": {}, + "expires_at": datetime.utcnow() + timedelta(minutes=PENDING_CANCEL_ORDER_DRAFT_TTL_MINUTES), + } + + if ( + "motivo" not in extracted + and draft["payload"].get("numero_pedido") + and not has_intent + ): + free_text = (message or "").strip() + if free_text and len(free_text) >= 4: + extracted["motivo"] = free_text + + draft["payload"].update(extracted) + draft["expires_at"] = datetime.utcnow() + timedelta(minutes=PENDING_CANCEL_ORDER_DRAFT_TTL_MINUTES) + self.PENDING_CANCEL_ORDER_DRAFTS[user_id] = draft + + missing = [field for field in CANCEL_ORDER_REQUIRED_FIELDS if field not in draft["payload"]] + if missing: + return self._render_missing_cancel_order_fields_prompt(missing) + + try: + tool_result = await self.registry.execute( + "cancelar_pedido", + draft["payload"], + user_id=user_id, + ) + except HTTPException as exc: + return self._http_exception_detail(exc) + finally: + self.PENDING_CANCEL_ORDER_DRAFTS.pop(user_id, None) + + return self._fallback_format_tool_result("cancelar_pedido", tool_result) + def _is_affirmative_message(self, text: str) -> bool: normalized = self._normalize_text(text).strip() normalized = re.sub(r"[.!?,;:]+$", "", normalized) @@ -802,10 +1293,15 @@ class OrquestradorService: payload["data_hora"] = suggested_iso self.PENDING_REVIEW_CONFIRMATIONS[user_id] = { "payload": payload, - "expires_at": datetime.utcnow() + timedelta(minutes=self.PENDING_REVIEW_TTL_MINUTES), + "expires_at": datetime.utcnow() + timedelta(minutes=PENDING_REVIEW_TTL_MINUTES), } - async def _try_confirm_pending_review(self, message: str, user_id: int | None) -> str | None: + async def _try_confirm_pending_review( + self, + message: str, + user_id: int | None, + extracted_review_fields: dict | None = None, + ) -> str | None: if user_id is None: return None pending = self.PENDING_REVIEW_CONFIRMATIONS.get(user_id) @@ -816,7 +1312,7 @@ class OrquestradorService: if self._is_negative_message(message) or time_only: # Se o usuario recusar a sugestao e informar novo horario, reaproveita # o payload pendente com a nova data/hora. - extracted = self._extract_review_fields(message) + extracted = self._normalize_review_fields(extracted_review_fields) new_data_hora = extracted.get("data_hora") if not new_data_hora and time_only: new_data_hora = self._merge_date_with_time(pending["payload"].get("data_hora", ""), time_only) @@ -864,33 +1360,6 @@ class OrquestradorService: self.PENDING_REVIEW_CONFIRMATIONS.pop(user_id, None) return self._fallback_format_tool_result("agendar_revisao", tool_result) - def _is_operational_query(self, message: str) -> bool: - text = message.lower() - keywords = ( - "estoque", - "carro", - "carros", - "suv", - "sedan", - "hatch", - "pickup", - "financi", - "cpf", - "troca", - "revis", - "agendamento", - "agendamentos", - "remarcar", - "placa", - "cancelar pedido", - "cancelar revisao", - "comprar", - "compra", - "realizar pedido", - "pedido", - ) - return any(k in text for k in keywords) - def _build_router_prompt(self, user_message: str, user_id: int | None) -> str: user_context = f"Contexto de usuario autenticado: user_id={user_id}.\n" if user_id else "" conversation_context = self._build_context_summary(user_id=user_id)