import re import unicodedata from datetime import datetime, timedelta from fastapi import HTTPException from sqlalchemy.orm import Session from app.services.llm_service import LLMService from app.services.tool_registry import ToolRegistry class OrquestradorService: # Memoria temporaria de confirmacao quando a API sugere novo horario (conflito 409). PENDING_REVIEW_CONFIRMATIONS: dict[int, dict] = {} PENDING_REVIEW_TTL_MINUTES = 30 # Pode ser alterado por uma variavel de configuracao caso o sistema cresca # Rascunho por usuario para juntar dados de revisao enviados em mensagens separadas. PENDING_REVIEW_DRAFTS: dict[int, dict] = {} PENDING_REVIEW_DRAFT_TTL_MINUTES = 30 REVIEW_REQUIRED_FIELDS = ( "placa", "data_hora", "modelo", "ano", "km", "revisao_previa_concessionaria", ) LOW_VALUE_RESPONSES = { "certo.", "certo", "ok.", "ok", "entendi.", "entendi", "claro.", "claro", } DETERMINISTIC_RESPONSE_TOOLS = { "consultar_estoque", "validar_cliente_venda", "avaliar_veiculo_troca", "agendar_revisao", "listar_agendamentos_revisao", "cancelar_agendamento_revisao", "editar_data_revisao", "cancelar_pedido", "realizar_pedido", } def __init__(self, db: Session): """Inicializa servicos de LLM e registro de tools para a sessao atual.""" self.llm = LLMService() self.registry = ToolRegistry(db) async def handle_message(self, message: str, user_id: int | None = None) -> str: """Processa mensagem, executa tool quando necessario e retorna resposta final.""" routing_message = self._resolve_primary_intent_message(message=message, user_id=user_id) # 1) Se houver sugestao pendente de horario e o usuario confirmou ("pode/sim"), # agenda direto no horario sugerido. confirmation_response = await self._try_confirm_pending_review(message=message, user_id=user_id) if confirmation_response: return confirmation_response # 2) Fluxo de coleta incremental de dados da revisao (slot filling). # Evita pedir tudo de novo quando o usuario responde em partes. review_response = await self._try_collect_and_schedule_review(message=message, user_id=user_id) if review_response: return review_response tools = self.registry.get_tools() llm_result = await self.llm.generate_response( message=self._build_router_prompt(user_message=routing_message, user_id=user_id), tools=tools, ) if not llm_result["tool_call"] and self._is_operational_query(routing_message): llm_result = await self.llm.generate_response( message=self._build_force_tool_prompt(user_message=routing_message, user_id=user_id), tools=tools, ) if llm_result["tool_call"]: tool_name = llm_result["tool_call"]["name"] arguments = llm_result["tool_call"]["arguments"] try: tool_result = await self.registry.execute( tool_name, arguments, user_id=user_id, ) except HTTPException as exc: self._capture_review_confirmation_suggestion( tool_name=tool_name, arguments=arguments, exc=exc, user_id=user_id, ) return self._http_exception_detail(exc) if self._should_use_deterministic_response(tool_name): return self._fallback_format_tool_result(tool_name, tool_result) final_response = await self.llm.generate_response( message=self._build_result_prompt( user_message=message, user_id=user_id, tool_name=tool_name, tool_result=tool_result, ), tools=[], ) text = (final_response.get("response") or "").strip() if self._is_low_value_response(text): return self._fallback_format_tool_result(tool_name, tool_result) return text or self._fallback_format_tool_result(tool_name, tool_result) text = (llm_result.get("response") or "").strip() if self._is_low_value_response(text): return "Entendi. Pode me dar mais detalhes para eu consultar corretamente?" return text def _reset_pending_review_states(self, user_id: int | None) -> None: if user_id is None: return self.PENDING_REVIEW_DRAFTS.pop(user_id, None) self.PENDING_REVIEW_CONFIRMATIONS.pop(user_id, None) def _is_purchase_intent(self, text: str) -> bool: lowered = self._normalize_text(text) keywords = ( "comprar", "compra", "carro", "carros", "veiculo", "veiculos", "estoque", ) return any(k in lowered for k in keywords) def _has_review_protocol(self, text: str) -> bool: return re.search(r"\brev-\d{8}-[a-z0-9]+\b", (text or "").lower()) is not None def _resolve_primary_intent_message(self, message: str, user_id: int | None) -> str: # Em mensagens mistas ("cancele ... agora quero comprar"), prioriza compra # quando nao ha protocolo explicito de revisao. if not self._is_purchase_intent(message): return message if not self._is_review_management_intent(message): return message if self._has_review_protocol(message): return message lowered = self._normalize_text(message) buy_markers = ("agora quero comprar", "quero comprar", "comprar", "compra") idx = -1 for marker in buy_markers: pos = lowered.rfind(marker) if pos > idx: idx = pos # Se identificar trecho de compra, usa apenas ele para rotear. if idx >= 0: self._reset_pending_review_states(user_id=user_id) return (message or "")[idx:].strip() or message return message def _should_use_deterministic_response(self, tool_name: str) -> bool: return tool_name in self.DETERMINISTIC_RESPONSE_TOOLS def _normalize_text(self, text: str) -> str: normalized = unicodedata.normalize("NFKD", text or "") ascii_text = normalized.encode("ascii", "ignore").decode("ascii") return ascii_text.lower() def _is_low_value_response(self, text: str) -> bool: return text.strip().lower() in self.LOW_VALUE_RESPONSES def _is_review_scheduling_intent(self, text: str) -> bool: lowered = self._normalize_text(text) scheduling_keywords = ( "agendar", "marcar revis", "marcar manutenc", "nova revis", "quero agendar", "quero marcar", ) return any(k in lowered for k in scheduling_keywords) def _is_review_management_intent(self, text: str) -> bool: lowered = (text or "").lower() management_keywords = ( "agendamento", "agendamentos", "meus agendamentos", "listar", "mostrar", "ver", "cancelar revis", "cancelar agendamento", "remarcar", "editar data", "alterar data", ) return any(k in lowered for k in management_keywords) def _extract_review_fields(self, text: str) -> dict: # Extrai os campos de revisao com regex simples para reduzir dependencia do LLM # em mensagens curtas de follow-up. lowered = self._normalize_text(text) extracted: dict = {} placa_match = re.search(r"\b([A-Za-z]{3}[0-9][A-Za-z0-9][0-9]{2}|[A-Za-z]{3}[0-9]{4})\b", text or "") if placa_match: extracted["placa"] = placa_match.group(1).upper() dt_match = re.search( r"(\d{1,2}[/-]\d{1,2}[/-]\d{4}\s*(?:as)?\s*\d{1,2}:\d{2})|" r"(\d{4}[/-]\d{1,2}[/-]\d{1,2}\s*(?:as)?\s*\d{1,2}:\d{2})|" r"(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}(?::\d{2})?(?:Z|[+-]\d{2}:\d{2})?)", lowered, ) if dt_match: value = next((g for g in dt_match.groups() if g), None) if value: extracted["data_hora"] = re.sub(r"\s+as\s+", " as ", value, flags=re.IGNORECASE) else: day_ref = None if re.search(r"\bhoje\b", lowered): day_ref = "hoje" elif re.search(r"\bamanh[a-z]?\b", lowered): day_ref = "amanha" if day_ref: time_match = re.search(r"\b(?:as\s*)?([01]?\d|2[0-3])(?::([0-5]\d))?\b", lowered) if time_match: hour = int(time_match.group(1)) minute = int(time_match.group(2) or "00") target_date = datetime.now() if day_ref == "amanha": target_date = target_date + timedelta(days=1) extracted["data_hora"] = f"{target_date.strftime('%d/%m/%Y')} {hour:02d}:{minute:02d}" modelo_match = re.search( r"modelo\s+([a-z0-9][a-z0-9\s\-]{1,40}?)(?=\s*(?:,|ano\b|\d{1,3}(?:[.\s]\d{3})*\s*km\b|$))", lowered, ) if modelo_match: modelo = modelo_match.group(1).strip(" ,.;") if modelo: extracted["modelo"] = modelo.title() ano_match = re.search(r"\bano\s*(?:de\s*)?(19\d{2}|20\d{2})\b", lowered) if not ano_match: # Fallback sem a palavra "ano", evitando capturar o ano de uma data (ex.: 10/03/2026). ano_match = re.search(r"(? str: labels = { "placa": "a placa do veiculo", "data_hora": "a data e hora desejada para a revisao", "modelo": "o modelo do veiculo", "ano": "o ano do veiculo", "km": "a quilometragem atual (km)", "revisao_previa_concessionaria": "se ja fez revisao na concessionaria (sim/nao)", } itens = [f"- {labels[field]}" for field in missing_fields] return "Para agendar sua revisao, preciso dos dados abaixo:\n" + "\n".join(itens) # Em vez de tentar entender tudo de uma vez, o bot mantem um "estado" do que ja sabe e vai perguntando apenas o que falta (os "slots" vazios) ate que a tarefa possa ser completada. async def _try_collect_and_schedule_review(self, message: str, user_id: int | None) -> str | None: if user_id is None: return None # Nao inicia slot-filling para fluxos de listar/cancelar/remarcar revisao. # Nesses casos o roteamento via LLM + tools deve seguir normalmente. if self._is_review_management_intent(message): # Se o usuario mudou para gerenciamento de revisao, encerra # qualquer coleta pendente de novo agendamento. self.PENDING_REVIEW_DRAFTS.pop(user_id, None) return None # Reaproveita rascunho anterior do usuario, se ainda estiver valido. draft = self.PENDING_REVIEW_DRAFTS.get(user_id) if draft and draft["expires_at"] < datetime.utcnow(): self.PENDING_REVIEW_DRAFTS.pop(user_id, None) draft = None extracted = self._extract_review_fields(message) has_intent = self._is_review_scheduling_intent(message) # Se houver rascunho de revisao, mas o usuario mudou para outra # intencao operacional (ex.: compra/estoque), descarta o rascunho. if draft and not has_intent and self._is_operational_query(message): self.PENDING_REVIEW_DRAFTS.pop(user_id, None) return None # Sem intencao de revisao e sem rascunho aberto: nao interfere no fluxo normal. if not has_intent and draft is None: return None if draft is None: draft = {"payload": {}, "expires_at": datetime.utcnow() + timedelta(minutes=self.PENDING_REVIEW_DRAFT_TTL_MINUTES)} # Permite o usuario "abortar" a coleta atual. if "cancelar" in (message or "").lower() and draft["payload"]: self.PENDING_REVIEW_DRAFTS.pop(user_id, None) return None # Merge incremental: apenas atualiza os campos detectados na mensagem atual. draft["payload"].update(extracted) # Se o usuario responder apenas "sim/nao" no follow-up, preenche o slot booleano. if ( "revisao_previa_concessionaria" not in draft["payload"] and draft["payload"] and not extracted ): if self._is_affirmative_message(message): draft["payload"]["revisao_previa_concessionaria"] = True elif self._is_negative_message(message): draft["payload"]["revisao_previa_concessionaria"] = False draft["expires_at"] = datetime.utcnow() + timedelta(minutes=self.PENDING_REVIEW_DRAFT_TTL_MINUTES) self.PENDING_REVIEW_DRAFTS[user_id] = draft # Enquanto faltar campo obrigatorio, responde de forma deterministica # (sem depender do LLM para lembrar contexto). missing = [field for field in self.REVIEW_REQUIRED_FIELDS if field not in draft["payload"]] if missing: return self._render_missing_review_fields_prompt(missing) try: # Com payload completo, executa a tool de agendamento. tool_result = await self.registry.execute( "agendar_revisao", draft["payload"], user_id=user_id, ) except HTTPException as exc: # Se houver conflito com sugestao de horario, salva para confirmar com "pode/sim". self._capture_review_confirmation_suggestion( tool_name="agendar_revisao", arguments=draft["payload"], exc=exc, user_id=user_id, ) return self._http_exception_detail(exc) finally: # Limpa o rascunho apos tentativa final para evitar estado sujo. self.PENDING_REVIEW_DRAFTS.pop(user_id, None) return self._fallback_format_tool_result("agendar_revisao", tool_result) def _is_affirmative_message(self, text: str) -> bool: normalized = self._normalize_text(text).strip() normalized = re.sub(r"[.!?,;:]+$", "", normalized) return normalized in {"sim", "pode", "ok", "confirmo", "aceito", "fechado", "pode sim"} def _is_negative_message(self, text: str) -> bool: normalized = self._normalize_text(text).strip() normalized = re.sub(r"[.!?,;:]+$", "", normalized) return ( normalized in {"nao", "nao quero", "prefiro outro", "outro horario"} or normalized.startswith("nao") ) def _extract_time_only(self, text: str) -> str | None: match = re.search(r"\b([01]?\d|2[0-3]):([0-5]\d)\b", text or "") if not match: return None return f"{int(match.group(1)):02d}:{match.group(2)}" def _merge_date_with_time(self, base_iso: str, new_time_hhmm: str) -> str | None: try: base_dt = datetime.fromisoformat((base_iso or "").replace("Z", "+00:00")) except ValueError: return None try: hour_text, minute_text = new_time_hhmm.split(":") merged = base_dt.replace(hour=int(hour_text), minute=int(minute_text), second=0, microsecond=0) return merged.isoformat() except Exception: return None def _capture_review_confirmation_suggestion( self, tool_name: str, arguments: dict, exc: HTTPException, user_id: int | None, ) -> None: if tool_name != "agendar_revisao" or user_id is None or exc.status_code != 409: return detail = exc.detail if isinstance(exc.detail, str) else "" match = re.search(r"ISO:\s*([^)]+)\)", detail) if not match: return suggested_iso = match.group(1).strip() payload = dict(arguments or {}) if not payload.get("placa"): return payload["data_hora"] = suggested_iso self.PENDING_REVIEW_CONFIRMATIONS[user_id] = { "payload": payload, "expires_at": datetime.utcnow() + timedelta(minutes=self.PENDING_REVIEW_TTL_MINUTES), } async def _try_confirm_pending_review(self, message: str, user_id: int | None) -> str | None: if user_id is None: return None pending = self.PENDING_REVIEW_CONFIRMATIONS.get(user_id) if not pending: return None time_only = self._extract_time_only(message) if self._is_negative_message(message) or time_only: # Se o usuario recusar a sugestao e informar novo horario, reaproveita # o payload pendente com a nova data/hora. extracted = self._extract_review_fields(message) new_data_hora = extracted.get("data_hora") if not new_data_hora and time_only: new_data_hora = self._merge_date_with_time(pending["payload"].get("data_hora", ""), time_only) if not new_data_hora: self.PENDING_REVIEW_CONFIRMATIONS.pop(user_id, None) return "Sem problema. Me informe a nova data e hora desejada para a revisao." payload = dict(pending["payload"]) payload["data_hora"] = new_data_hora try: tool_result = await self.registry.execute( "agendar_revisao", payload, user_id=user_id, ) except HTTPException as exc: self.PENDING_REVIEW_CONFIRMATIONS.pop(user_id, None) self._capture_review_confirmation_suggestion( tool_name="agendar_revisao", arguments=payload, exc=exc, user_id=user_id, ) return self._http_exception_detail(exc) self.PENDING_REVIEW_CONFIRMATIONS.pop(user_id, None) return self._fallback_format_tool_result("agendar_revisao", tool_result) if not self._is_affirmative_message(message): return None if pending["expires_at"] < datetime.utcnow(): self.PENDING_REVIEW_CONFIRMATIONS.pop(user_id, None) return None try: tool_result = await self.registry.execute( "agendar_revisao", pending["payload"], user_id=user_id, ) except HTTPException as exc: self.PENDING_REVIEW_CONFIRMATIONS.pop(user_id, None) return self._http_exception_detail(exc) self.PENDING_REVIEW_CONFIRMATIONS.pop(user_id, None) return self._fallback_format_tool_result("agendar_revisao", tool_result) def _is_operational_query(self, message: str) -> bool: text = message.lower() keywords = ( "estoque", "carro", "carros", "suv", "sedan", "hatch", "pickup", "financi", "cpf", "troca", "revis", "agendamento", "agendamentos", "remarcar", "placa", "cancelar pedido", "cancelar revisao", "comprar", "compra", "realizar pedido", "pedido", ) return any(k in text for k in keywords) def _build_router_prompt(self, user_message: str, user_id: int | None) -> str: user_context = f"Contexto de usuario autenticado: user_id={user_id}.\n" if user_id else "" return ( "Voce e um assistente de concessionaria. " "Sempre que a solicitacao depender de dados operacionais (estoque, validacao de cliente, " "avaliacao de troca, agendamento de revisao, realizacao ou cancelamento de pedido), use a tool correta. " "Se faltar parametro obrigatorio para a tool, responda em texto pedindo apenas o que falta.\n\n" f"{user_context}" f"Mensagem do usuario: {user_message}" ) def _build_force_tool_prompt(self, user_message: str, user_id: int | None) -> str: user_context = f"Contexto de usuario autenticado: user_id={user_id}.\n" if user_id else "" return ( "Reavalie a mensagem e priorize chamar tool se houver intencao operacional. " "Use texto apenas quando faltar dado obrigatorio.\n\n" f"{user_context}" f"Mensagem do usuario: {user_message}" ) def _build_result_prompt( self, user_message: str, user_id: int | None, tool_name: str, tool_result, ) -> str: user_context = f"Contexto de usuario autenticado: user_id={user_id}.\n" if user_id else "" return ( "Responda ao usuario de forma objetiva usando o resultado da tool abaixo. " "Nao invente dados. Se a lista vier vazia, diga explicitamente que nao encontrou resultados. " "Retorne texto puro sem markdown, sem asteriscos, sem emojis e com linhas curtas.\n\n" f"{user_context}" f"Pergunta original: {user_message}\n" f"Tool executada: {tool_name}\n" f"Resultado da tool: {tool_result}" ) def _http_exception_detail(self, exc: HTTPException) -> str: detail = exc.detail if isinstance(detail, str): return detail return "Nao foi possivel concluir a operacao solicitada." def _format_datetime_for_chat(self, value: str) -> str: try: dt = datetime.fromisoformat((value or "").replace("Z", "+00:00")) return dt.strftime("%d/%m/%Y %H:%M") except Exception: return value or "N/A" def _format_currency_br(self, value) -> str: try: number = float(value) formatted = f"{number:,.2f}".replace(",", "X").replace(".", ",").replace("X", ".") return f"R$ {formatted}" except Exception: return "N/A" def _fallback_format_tool_result(self, tool_name: str, tool_result) -> str: if tool_name == "consultar_estoque" and isinstance(tool_result, list): if not tool_result: return "Nao encontrei nenhum veiculo com os criterios informados." linhas = [f"Encontrei {len(tool_result)} veiculo(s):"] for idx, item in enumerate(tool_result[:10], start=1): modelo = item.get("modelo", "N/A") categoria = item.get("categoria", "N/A") preco = self._format_currency_br(item.get("preco")) linhas.append(f"{idx}. {modelo} ({categoria}) - {preco}") restantes = len(tool_result) - 10 if restantes > 0: linhas.append(f"... e mais {restantes} veiculo(s).") return "\n".join(linhas) if tool_name == "cancelar_pedido" and isinstance(tool_result, dict): numero = tool_result.get("numero_pedido", "N/A") status = tool_result.get("status", "N/A") motivo = tool_result.get("motivo") linhas = [f"Pedido {numero} atualizado.", f"Status: {status}"] if motivo: linhas.append(f"Motivo: {motivo}") return "\n".join(linhas) if tool_name == "realizar_pedido" and isinstance(tool_result, dict): numero = tool_result.get("numero_pedido", "N/A") valor = self._format_currency_br(tool_result.get("valor_veiculo")) return f"Pedido criado com sucesso.\nNumero: {numero}\nValor: {valor}" if tool_name == "agendar_revisao" and isinstance(tool_result, dict): placa = tool_result.get("placa", "N/A") data_hora = self._format_datetime_for_chat(tool_result.get("data_hora", "N/A")) protocolo = tool_result.get("protocolo", "N/A") valor = tool_result.get("valor_revisao") if isinstance(valor, (int, float)): return ( "Revisao agendada com sucesso.\n" f"Protocolo: {protocolo}\n" f"Placa: {placa}\n" f"Data/Hora: {data_hora}\n" f"Valor estimado: {self._format_currency_br(valor)}" ) return ( "Revisao agendada com sucesso.\n" f"Protocolo: {protocolo}\n" f"Placa: {placa}\n" f"Data/Hora: {data_hora}" ) if tool_name == "listar_agendamentos_revisao" and isinstance(tool_result, list): if not tool_result: return "Nao encontrei agendamentos de revisao para sua conta." linhas = [f"Voce tem {len(tool_result)} agendamento(s):"] for idx, item in enumerate(tool_result[:12], start=1): protocolo = item.get("protocolo", "N/A") placa = item.get("placa", "N/A") data_hora = self._format_datetime_for_chat(item.get("data_hora", "N/A")) status = item.get("status", "N/A") linhas.append(f"{idx}) Protocolo: {protocolo}") linhas.append(f"Placa: {placa}") linhas.append(f"Data/Hora: {data_hora} | Status: {status}") if idx < min(len(tool_result), 12): linhas.append("") restantes = len(tool_result) - 12 if restantes > 0: if linhas and linhas[-1] != "": linhas.append("") linhas.append(f"... e mais {restantes} agendamento(s).") return "\n".join(linhas) if tool_name == "cancelar_agendamento_revisao" and isinstance(tool_result, dict): protocolo = tool_result.get("protocolo", "N/A") status = tool_result.get("status", "N/A") placa = tool_result.get("placa", "N/A") data_hora = self._format_datetime_for_chat(tool_result.get("data_hora", "N/A")) return ( "Agendamento atualizado.\n" f"Protocolo: {protocolo}\n" f"Placa: {placa}\n" f"Data/Hora: {data_hora}\n" f"Status: {status}" ) if tool_name == "editar_data_revisao" and isinstance(tool_result, dict): protocolo = tool_result.get("protocolo", "N/A") placa = tool_result.get("placa", "N/A") data_hora = self._format_datetime_for_chat(tool_result.get("data_hora", "N/A")) status = tool_result.get("status", "N/A") return ( "Agendamento remarcado com sucesso.\n" f"Protocolo: {protocolo}\n" f"Placa: {placa}\n" f"Nova data/hora: {data_hora}\n" f"Status: {status}" ) if tool_name == "validar_cliente_venda" and isinstance(tool_result, dict): aprovado = tool_result.get("aprovado") limite = self._format_currency_br(tool_result.get("limite_credito")) score = tool_result.get("score", "N/A") cpf = tool_result.get("cpf", "N/A") if aprovado: return ( "Cliente aprovado para financiamento.\n" f"CPF: {cpf}\n" f"Score: {score}\n" f"Limite: {limite}" ) return ( "Cliente nao aprovado para financiamento.\n" f"CPF: {cpf}\n" f"Score: {score}\n" f"Limite: {limite}" ) return "Operacao concluida com sucesso."