From 14778fac0ba212110c51e687fb610c4c7b81b758 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vitor=20Hugo=20Belorio=20Sim=C3=A3o?= Date: Mon, 9 Mar 2026 18:20:55 -0300 Subject: [PATCH] =?UTF-8?q?=E2=99=BB=EF=B8=8F=20refactor(orchestration):?= =?UTF-8?q?=20extrair=20componentes=20do=20orquestrador?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Move a normalização, o planejamento de mensagens, a política conversacional e a execução de tools para módulos dedicados. Reduz o acoplamento do OrquestradorService e prepara a base para evoluções futuras com menos risco de regressão. --- .../orchestration/conversation_policy.py | 655 ++++++++++ .../orchestration/entity_normalizer.py | 418 ++++++ app/services/orchestration/message_planner.py | 160 +++ .../orchestration/orquestrador_service.py | 1144 ++--------------- app/services/orchestration/tool_executor.py | 28 + 5 files changed, 1357 insertions(+), 1048 deletions(-) create mode 100644 app/services/orchestration/conversation_policy.py create mode 100644 app/services/orchestration/entity_normalizer.py create mode 100644 app/services/orchestration/message_planner.py create mode 100644 app/services/orchestration/tool_executor.py diff --git a/app/services/orchestration/conversation_policy.py b/app/services/orchestration/conversation_policy.py new file mode 100644 index 0000000..9b48bab --- /dev/null +++ b/app/services/orchestration/conversation_policy.py @@ -0,0 +1,655 @@ +import re +from datetime import datetime, timedelta +from typing import TYPE_CHECKING + +from app.services.orchestration.orchestrator_config import ( + CANCEL_ORDER_REQUIRED_FIELDS, + ORDER_REQUIRED_FIELDS, + PENDING_ORDER_SELECTION_TTL_MINUTES, + REVIEW_REQUIRED_FIELDS, +) + +if TYPE_CHECKING: + from app.services.orchestration.orquestrador_service import OrquestradorService + +# essa classe é responsável por controlar qual o assunto está ativo na conversa, se existe fluxo aberto, se o usuário mandou dois pedidos ao mesmo tempo... +class ConversationPolicy: + def __init__(self, service: "OrquestradorService"): + self.service = service + + # Essa função serve para reaproveitar informações já informadas antes, evitando pedir novamente ao usuário. + def try_prefill_review_fields_from_memory(self, user_id: int | None, payload: dict) -> None: + if user_id is None: + return + context = self.service._get_user_context(user_id) + if not context: + return + memory = context.get("generic_memory", {}) + if payload.get("placa") is None: + plate = self.service.normalizer.normalize_plate(memory.get("placa")) + if plate: + payload["placa"] = plate + + # Essa função coloca um pedido na fila + def queue_order(self, user_id: int | None, domain: str, order_message: str) -> None: + self.queue_order_with_memory_seed( + user_id=user_id, + domain=domain, + order_message=order_message, + memory_seed=self.service._new_tab_memory(user_id=user_id), + ) + + # Enfileira um próximo assunto para ser tratado depois, preservando dados úteis daquele pedido + def queue_order_with_memory_seed( + self, + user_id: int | None, + domain: str, + order_message: str, + memory_seed: dict | None = None, + ) -> None: + context = self.service._get_user_context(user_id) + if not context or domain == "general": + return + queue = context.setdefault("order_queue", []) + queue.append( + { + "domain": domain, + "message": (order_message or "").strip(), + "memory_seed": dict(memory_seed or self.service._new_tab_memory(user_id=user_id)), + "created_at": datetime.utcnow().isoformat(), + } + ) + + + # Transforma as entidades extraídas de um pedido em uma memória temporária pronta para usar quando esse pedido for processado. + def build_order_memory_seed(self, user_id: int | None, order: dict | None = None) -> dict: + seed = dict(self.service._new_tab_memory(user_id=user_id)) + if not isinstance(order, dict): + return seed + + entities = order.get("entities") + if not isinstance(entities, dict): + return seed + + generic_memory = self.service.normalizer.normalize_generic_fields(entities.get("generic_memory")) + if generic_memory: + seed.update(generic_memory) + + review_fields = self.service.normalizer.normalize_review_fields(entities.get("review_fields")) + if review_fields.get("placa") and not seed.get("placa"): + seed["placa"] = review_fields["placa"] + + order_fields = self.service.normalizer.normalize_order_fields(entities.get("order_fields")) + if order_fields.get("cpf") and not seed.get("cpf"): + seed["cpf"] = order_fields["cpf"] + if order_fields.get("valor_veiculo") and not seed.get("orcamento_max"): + seed["orcamento_max"] = int(round(order_fields["valor_veiculo"])) + + return seed + + + # Pega o próximo assunto pendente do usuário. + def pop_next_order(self, user_id: int | None) -> dict | None: + context = self.service._get_user_context(user_id) + if not context: + return None + queue = context.setdefault("order_queue", []) + if not queue: + return None + return queue.pop(0) + + + + # Decide qual pedido deve ser processado agora, qual vai para fila, e se o usuário precisa escolher entre dois pedidos. + def prepare_message_for_single_order( + self, + message: str, + user_id: int | None, + routing_plan: dict | None = None, + ) -> tuple[str, str | None, str | None]: + context = self.service._get_user_context(user_id) + if not context: + return message, None, None + + queue_notice = None + active_domain = context.get("active_domain", "general") + + orders_raw = (routing_plan or {}).get("orders") if isinstance(routing_plan, dict) else None + extracted_orders: list[dict] = [] + if isinstance(orders_raw, list): + for item in orders_raw: + if not isinstance(item, dict): + continue + domain = str(item.get("domain") or "general").strip().lower() + if domain not in {"review", "sales", "general"}: + domain = "general" + segment = str(item.get("message") or "").strip() + if segment: + extracted_orders.append( + { + "domain": domain, + "message": segment, + "entities": self.service._coerce_extraction_contract(item.get("entities")), + } + ) + if not extracted_orders: + extracted_orders = [{"domain": "general", "message": (message or "").strip()}] + + if ( + len(extracted_orders) == 2 + and all(order["domain"] != "general" for order in extracted_orders) + and not self.has_open_flow(user_id=user_id, domain=active_domain) + ): + self.store_pending_order_selection(user_id=user_id, orders=extracted_orders) + return message, None, self.render_order_selection_prompt(extracted_orders) + + if len(extracted_orders) <= 1: + inferred = extracted_orders[0]["domain"] + if ( + inferred != "general" + and inferred != active_domain + and self.has_open_flow(user_id=user_id, domain=active_domain) + ): + self.queue_order(user_id=user_id, domain=inferred, order_message=message) + queue_hint = self.render_queue_notice(1) + prompt = self.render_open_flow_prompt(user_id=user_id, domain=active_domain) + return message, None, f"{prompt}\n{queue_hint}" if queue_hint else prompt + return message, None, None + + if self.has_open_flow(user_id=user_id, domain=active_domain): + queued_count = 0 + for queued in extracted_orders: + if queued["domain"] != active_domain: + self.queue_order_with_memory_seed( + user_id=user_id, + domain=queued["domain"], + order_message=queued["message"], + memory_seed=self.build_order_memory_seed(user_id=user_id, order=queued), + ) + queued_count += 1 + queue_hint = self.render_queue_notice(queued_count) + prompt = self.render_open_flow_prompt(user_id=user_id, domain=active_domain) + return message, None, f"{prompt}\n{queue_hint}" if queue_hint else prompt + + first = extracted_orders[0] + queued_count = 0 + for queued in extracted_orders[1:]: + self.queue_order_with_memory_seed( + user_id=user_id, + domain=queued["domain"], + order_message=queued["message"], + memory_seed=self.build_order_memory_seed(user_id=user_id, order=queued), + ) + queued_count += 1 + context["active_domain"] = first["domain"] + context["generic_memory"] = self.build_order_memory_seed(user_id=user_id, order=first) + + queue_notice = self.render_queue_notice(queued_count) + return first["message"], queue_notice, None + + + # Serve para concatenar mensagens auxiliares com a resposta principal. + def compose_order_aware_response(self, response: str, queue_notice: str | None = None) -> str: + lines = [] + if queue_notice: + lines.append(queue_notice) + lines.append(response) + return "\n".join(lines) + + + # Armazena uma “decisão pendente” de qual pedido começar. + def store_pending_order_selection(self, user_id: int | None, orders: list[dict]) -> None: + context = self.service._get_user_context(user_id) + if not context: + return + context["pending_order_selection"] = { + "orders": [ + { + "domain": order["domain"], + "message": order["message"], + "memory_seed": self.build_order_memory_seed(user_id=user_id, order=order), + } + for order in orders[:2] + ], + "expires_at": datetime.utcnow() + timedelta(minutes=PENDING_ORDER_SELECTION_TTL_MINUTES), + } + + + # Cria o texto de escolha para o usuário. + def render_order_selection_prompt(self, orders: list[dict]) -> str: + if len(orders) < 2: + return "Qual das acoes voce quer iniciar primeiro?" + first_label = self.describe_order_selection_option(orders[0]) + second_label = self.describe_order_selection_option(orders[1]) + return ( + "Identifiquei duas acoes na sua mensagem:\n" + f"1. {first_label}\n" + f"2. {second_label}\n" + "Qual delas voce quer iniciar primeiro? Se for indiferente, eu escolho." + ) + + + # Formata o rótulo do pedido para exibição. + def describe_order_selection_option(self, order: dict) -> str: + domain = str(order.get("domain") or "general") + message = str(order.get("message") or "").strip() + domain_prefix = { + "review": "Revisao", + "sales": "Venda", + "general": "Atendimento", + }.get(domain, "Atendimento") + return f"{domain_prefix}: {message}" + + + # É um helper simples para busca de palavras-chave + def contains_any_term(self, text: str, terms: set[str]) -> bool: + return any(term in text for term in terms) + + + # Limpa a mensagem para facilitar a detecção de escolha + def strip_choice_message(self, text: str) -> str: + cleaned = (text or "").strip() + while cleaned and cleaned[-1] in ".!?,;:": + cleaned = cleaned[:-1].rstrip() + return cleaned + + + # Se o usuário disser algo como: "esquece tudo e quero agendar revisão" a função remove a parte de reset e devolve só o novo pedido. + def remove_order_selection_reset_prefix(self, message: str) -> str: + raw = (message or "").strip() + normalized = self.service.normalizer.normalize_text(raw) + prefixes = ("esqueca tudo agora", "esqueca tudo", "esquece tudo agora", "esquece tudo") + for prefix in prefixes: + if normalized.startswith(prefix): + return raw[len(prefix):].lstrip(" ,.:;-") + return raw + + + # Detecta intenção de abandonar o contexto atual + def is_order_selection_reset_message(self, message: str) -> bool: + normalized = self.service.normalizer.normalize_text(message).strip() + reset_terms = { + "esqueca tudo", + "esqueca tudo agora", + "esquece tudo", + "esquece tudo agora", + "ignora isso", + "ignore isso", + "deixa isso", + "deixa pra la", + "deixa para la", + "novo assunto", + "muda de assunto", + "vamos comecar de novo", + "comecar de novo", + "reiniciar", + "resetar conversa", + } + return self.contains_any_term(normalized, reset_terms) + + + # Ajuda a perceber quando o usuário talvez tenha mudado de assunto sem responder à pergunta de escolha + def looks_like_fresh_operational_request(self, message: str) -> bool: + normalized = self.service.normalizer.normalize_text(message).strip() + if len(normalized) < 15: + return False + operational_terms = { + "agendar", + "revisao", + "cancelar", + "pedido", + "comprar", + "compra", + "carro", + "veiculo", + "remarcar", + "tambem", + } + return self.contains_any_term(normalized, operational_terms) + + + # Interpreta a resposta do usuário na etapa de seleção. + def detect_selected_order_index(self, message: str, orders: list[dict]) -> tuple[int | None, bool]: + normalized = self.strip_choice_message(self.service.normalizer.normalize_text(message)) + indifferent_tokens = { + "tanto faz", + "indiferente", + "qualquer um", + "qualquer uma", + "voce escolhe", + "pode escolher", + "fica a seu criterio", + } + if normalized in indifferent_tokens: + return 0, True + if normalized in {"1", "primeiro", "primeira", "opcao 1", "acao 1", "pedido 1"}: + return 0, False + if normalized in {"2", "segundo", "segunda", "opcao 2", "acao 2", "pedido 2"}: + return 1, False + + review_matches = [index for index, order in enumerate(orders) if order.get("domain") == "review"] + sales_matches = [index for index, order in enumerate(orders) if order.get("domain") == "sales"] + has_review_signal = self.contains_any_term(normalized, {"revisao", "agendamento", "agendar", "remarcar", "pos venda"}) + has_sales_signal = self.contains_any_term(normalized, {"venda", "compra", "comprar", "pedido", "cancelamento", "cancelar", "carro", "veiculo"}) + + if len(review_matches) == 1 and has_review_signal and not has_sales_signal: + return review_matches[0], False + if len(sales_matches) == 1 and has_sales_signal and not has_review_signal: + return sales_matches[0], False + return None, False + + + # É a função que efetivamente trata a resposta do usuário quando você perguntou “qual pedido quer fazer primeiro?”. + async def try_resolve_pending_order_selection(self, message: str, user_id: int | None) -> str | None: + context = self.service._get_user_context(user_id) + if not context: + return None + pending = context.get("pending_order_selection") + if not isinstance(pending, dict): + return None + if pending.get("expires_at") and pending["expires_at"] < datetime.utcnow(): + context["pending_order_selection"] = None + return None + orders = pending.get("orders") or [] + if len(orders) < 2: + context["pending_order_selection"] = None + return None + + if self.is_order_selection_reset_message(message): + self.service._clear_user_conversation_state(user_id=user_id) + cleaned_message = self.remove_order_selection_reset_prefix(message) + if not cleaned_message: + return "Tudo bem. Limpei o contexto atual. Pode me dizer o que voce quer fazer agora?" + return await self.service.handle_message(cleaned_message, user_id=user_id) + + selected_index, auto_selected = self.detect_selected_order_index(message=message, orders=orders) + if selected_index is None: + if self.looks_like_fresh_operational_request(message): + context["pending_order_selection"] = None + return None + return self.render_order_selection_prompt(orders) + + selected_order = orders[selected_index] + remaining_order = orders[1 - selected_index] + context["pending_order_selection"] = None + self.queue_order_with_memory_seed( + user_id=user_id, + domain=remaining_order["domain"], + order_message=remaining_order["message"], + memory_seed=remaining_order.get("memory_seed"), + ) + + intro = ( + f"Vou escolher e comecar por: {self.describe_order_selection_option(selected_order)}" + if auto_selected + else f"Perfeito. Vou comecar por: {self.describe_order_selection_option(selected_order)}" + ) + selected_memory = dict(selected_order.get("memory_seed") or {}) + if selected_memory: + context["generic_memory"] = selected_memory + next_response = await self.service.handle_message(str(selected_order.get("message") or ""), user_id=user_id) + return f"{intro}\n{next_response}" + + + # Cria o aviso de fila. + def render_queue_notice(self, queued_count: int) -> str | None: + if queued_count <= 0: + return None + if queued_count == 1: + return "Anotei mais 1 pedido e sigo nele quando voce disser 'continuar'." + return f"Anotei mais {queued_count} pedidos e sigo neles conforme voce for dizendo 'continuar'." + + + # Mostra ao usuário o que falta concluir no fluxo atual antes de mudar de assunto. + def render_open_flow_prompt(self, user_id: int | None, domain: str) -> str: + if domain == "review" and user_id is not None: + draft = self.service.state.get_entry("pending_review_drafts", user_id, expire=True) + if draft: + missing = [field for field in REVIEW_REQUIRED_FIELDS if field not in draft.get("payload", {})] + if missing: + return self.service._render_missing_review_fields_prompt(missing) + management_draft = self.service.state.get_entry("pending_review_management_drafts", user_id, expire=True) + if management_draft: + action = management_draft.get("action", "cancel") + payload = management_draft.get("payload", {}) + if action == "reschedule": + missing = [field for field in ("protocolo", "nova_data_hora") if field not in payload] + if missing: + return self.service._render_missing_review_reschedule_fields_prompt(missing) + else: + missing = [field for field in ("protocolo",) if field not in payload] + if missing: + return self.service._render_missing_review_cancel_fields_prompt(missing) + if self.service.state.get_entry("pending_review_confirmations", user_id, expire=True): + return "Antes de mudar de assunto, me confirme se podemos concluir seu agendamento de revisao." + if self.service.state.get_entry("pending_review_reuse_confirmations", user_id, expire=True): + return self.service._render_review_reuse_question() + + if domain == "sales" and user_id is not None: + draft = self.service.state.get_entry("pending_order_drafts", user_id, expire=True) + if draft: + missing = [field for field in ORDER_REQUIRED_FIELDS if field not in draft.get("payload", {})] + if missing: + return self.service._render_missing_order_fields_prompt(missing) + cancel_draft = self.service.state.get_entry("pending_cancel_order_drafts", user_id, expire=True) + if cancel_draft: + missing = [field for field in CANCEL_ORDER_REQUIRED_FIELDS if field not in cancel_draft.get("payload", {})] + if missing: + return self.service._render_missing_cancel_order_fields_prompt(missing) + + return "Vamos concluir este assunto primeiro e ja sigo com o proximo em seguida." + + + # Cria a introdução para mudar de assunto de forma natural. + def build_next_order_transition(self, domain: str) -> str: + if domain == "sales": + return "Agora, sobre a compra do veiculo:" + if domain == "review": + return "Agora, sobre o agendamento da revisao:" + return "Agora, sobre o proximo assunto:" + + + # Quando um fluxo termina, ela prepara a passagem para o próximo pedido da fila. + async def maybe_auto_advance_next_order(self, base_response: str, user_id: int | None) -> str: + context = self.service._get_user_context(user_id) + if not context: + return base_response + if context.get("pending_switch"): + return base_response + active_domain = context.get("active_domain", "general") + if self.has_open_flow(user_id=user_id, domain=active_domain): + return base_response + + next_order = self.pop_next_order(user_id=user_id) + if not next_order: + return base_response + + context["pending_switch"] = { + "source_domain": context.get("active_domain", "general"), + "target_domain": next_order["domain"], + "queued_message": next_order["message"], + "memory_seed": dict(next_order.get("memory_seed") or self.service._new_tab_memory(user_id=user_id)), + "expires_at": datetime.utcnow() + timedelta(minutes=15), + } + transition = self.build_next_order_transition(next_order["domain"]) + return ( + f"{base_response}\n\n" + f"{transition}\n" + "Tenho um proximo pedido na fila. Quando quiser, diga 'continuar' para eu seguir nele." + ) + + + # Converte intenções em um domínio principal de atendimento. + def domain_from_intents(self, intents: dict | None) -> str: + normalized = self.service.normalizer.normalize_intents(intents) + review_score = ( + int(normalized.get("review_schedule", False)) + + int(normalized.get("review_list", False)) + + int(normalized.get("review_cancel", False)) + + int(normalized.get("review_reschedule", False)) + ) + sales_score = int(normalized.get("order_create", False)) + int(normalized.get("order_cancel", False)) + if review_score > sales_score and review_score > 0: + return "review" + if sales_score > review_score and sales_score > 0: + return "sales" + return "general" + + + # Detecta comandos de continuação. + def is_context_switch_confirmation(self, message: str) -> bool: + return self.service._is_affirmative_message(message) or self.service._is_negative_message(message) + + + # Executa o próximo pedido da fila quando o usuário disser “continuar”. + def is_continue_queue_message(self, message: str) -> bool: + normalized = self.service.normalizer.normalize_text(message).strip() + normalized = re.sub(r"[.!?,;:]+$", "", normalized) + return normalized in {"continuar", "pode continuar", "seguir", "pode seguir", "proximo", "segue"} + + + # Executa o próximo pedido da fila quando o usuário disser “continuar”. + async def try_continue_queued_order(self, message: str, user_id: int | None) -> str | None: + context = self.service._get_user_context(user_id) + if not context: + return None + pending_switch = context.get("pending_switch") + if not isinstance(pending_switch, dict): + return None + if pending_switch.get("expires_at") and pending_switch["expires_at"] < datetime.utcnow(): + context["pending_switch"] = None + return None + queued_message = str(pending_switch.get("queued_message") or "").strip() + if not queued_message: + return None + + if self.service._is_negative_message(message): + context["pending_switch"] = None + return "Tudo bem. Mantive o proximo pedido fora da fila por enquanto." + if not (self.is_continue_queue_message(message) or self.service._is_affirmative_message(message)): + return None + + target_domain = str(pending_switch.get("target_domain") or "general") + memory_seed = dict(pending_switch.get("memory_seed") or {}) + self.apply_domain_switch(user_id=user_id, target_domain=target_domain) + refreshed = self.service._get_user_context(user_id) + if refreshed is not None: + refreshed["generic_memory"] = memory_seed + transition = self.build_next_order_transition(target_domain) + next_response = await self.service.handle_message(queued_message, user_id=user_id) + return f"{transition}\n{next_response}" + + + # Diz se ainda existe algo pendente antes de encerrar aquele assunto. + def has_open_flow(self, user_id: int | None, domain: str) -> bool: + if user_id is None: + return False + if domain == "review": + return bool( + self.service.state.get_entry("pending_review_drafts", user_id, expire=True) + or self.service.state.get_entry("pending_review_confirmations", user_id, expire=True) + or self.service.state.get_entry("pending_review_management_drafts", user_id, expire=True) + or self.service.state.get_entry("pending_review_reuse_confirmations", user_id, expire=True) + ) + if domain == "sales": + return bool( + self.service.state.get_entry("pending_order_drafts", user_id, expire=True) + or self.service.state.get_entry("pending_cancel_order_drafts", user_id, expire=True) + ) + return False + + + # Encerra o contexto anterior e troca oficialmente para o novo assunto. + def apply_domain_switch(self, user_id: int | None, target_domain: str) -> None: + context = self.service._get_user_context(user_id) + if not context: + return + previous_domain = context.get("active_domain", "general") + if previous_domain == "review": + self.service._reset_pending_review_states(user_id=user_id) + if previous_domain == "sales": + self.service._reset_pending_order_states(user_id=user_id) + context["active_domain"] = target_domain + context["generic_memory"] = self.service._new_tab_memory(user_id=user_id) + context["pending_order_selection"] = None + context["pending_switch"] = None + + + # Controla a confirmação de “você quer mesmo sair deste assunto e ir para outro?”. + def handle_context_switch(self, message: str, user_id: int | None, target_domain_hint: str = "general") -> str | None: + context = self.service._get_user_context(user_id) + if not context: + return None + pending_switch = context.get("pending_switch") + if pending_switch: + if pending_switch["expires_at"] < datetime.utcnow(): + context["pending_switch"] = None + elif self.is_context_switch_confirmation(message): + if self.service._is_affirmative_message(message): + target_domain = pending_switch["target_domain"] + self.apply_domain_switch(user_id=user_id, target_domain=target_domain) + return self.render_context_switched_message(target_domain=target_domain) + context["pending_switch"] = None + return "Perfeito, vamos continuar no fluxo atual." + + pending_order_selection = context.get("pending_order_selection") + if pending_order_selection and pending_order_selection.get("expires_at") < datetime.utcnow(): + context["pending_order_selection"] = None + + current_domain = context.get("active_domain", "general") + if target_domain_hint == "general" or target_domain_hint == current_domain: + return None + if not self.has_open_flow(user_id=user_id, domain=current_domain): + return None + + context["pending_switch"] = { + "source_domain": current_domain, + "target_domain": target_domain_hint, + "expires_at": datetime.utcnow() + timedelta(minutes=15), + } + return self.render_context_switch_confirmation(source_domain=current_domain, target_domain=target_domain_hint) + + + # Marca qual domínio está ativo atualmente. + def update_active_domain(self, user_id: int | None, domain_hint: str = "general") -> None: + context = self.service._get_user_context(user_id) + if context and domain_hint != "general": + context["active_domain"] = domain_hint + + + # Serve para exibir o nome do domínio em mensagens para o usuário. + def domain_label(self, domain: str) -> str: + labels = { + "review": "agendamento de revisao", + "sales": "compra de veiculo", + "general": "atendimento geral", + } + return labels.get(domain, "atendimento") + + + # É o prompt de confirmação da troca. + def render_context_switch_confirmation(self, source_domain: str, target_domain: str) -> str: + return ( + f"Entendi que voce quer sair de {self.domain_label(source_domain)} " + f"e ir para {self.domain_label(target_domain)}. Tem certeza?" + ) + + + #Mensagem exibida após a troca acontecer. + def render_context_switched_message(self, target_domain: str) -> str: + return f"Certo, contexto anterior encerrado. Vamos seguir com {self.domain_label(target_domain)}." + + + # Serve para depuração, observabilidade ou até para alimentar outro componente com um resumo do estado atual. + def build_context_summary(self, user_id: int | None) -> str: + context = self.service._get_user_context(user_id) + if not context: + return "Contexto de conversa: sem contexto ativo." + summary = [f"Contexto de conversa ativo: {self.domain_label(context.get('active_domain', 'general'))}."] + memory = context.get("generic_memory", {}) + if memory: + summary.append(f"Memoria generica temporaria: {memory}.") + order_queue = context.get("order_queue", []) + if order_queue: + summary.append(f"Fila de pedidos pendentes: {len(order_queue)}.") + return " ".join(summary) diff --git a/app/services/orchestration/entity_normalizer.py b/app/services/orchestration/entity_normalizer.py new file mode 100644 index 0000000..c7e3906 --- /dev/null +++ b/app/services/orchestration/entity_normalizer.py @@ -0,0 +1,418 @@ +import json +import logging +import re +import unicodedata +from datetime import datetime, timedelta + + +logger = logging.getLogger(__name__) + + +class EntityNormalizer: + def empty_extraction_payload(self) -> dict: + return { + "generic_memory": {}, + "review_fields": {}, + "review_management_fields": {}, + "order_fields": {}, + "cancel_order_fields": {}, + "intents": {}, + } + + def empty_message_plan(self, message: str) -> dict: + return { + "orders": [ + { + "domain": "general", + "message": (message or "").strip(), + "entities": self.empty_extraction_payload(), + } + ] + } + + def coerce_message_plan(self, payload, message: str) -> dict: + default = self.empty_message_plan(message=message) + if not isinstance(payload, dict): + return default + + raw_orders = payload.get("orders") + if not isinstance(raw_orders, list): + return default + + normalized_orders: list[dict] = [] + for item in raw_orders: + if not isinstance(item, dict): + continue + domain = str(item.get("domain") or "general").strip().lower() + if domain not in {"review", "sales", "general"}: + domain = "general" + segment = str(item.get("message") or "").strip() + if not segment: + continue + normalized_orders.append( + { + "domain": domain, + "message": segment, + "entities": self.coerce_extraction_contract(item.get("entities")), + } + ) + + if not normalized_orders: + return default + return {"orders": normalized_orders} + + def coerce_extraction_contract(self, payload) -> dict: + if not isinstance(payload, dict): + return self.empty_extraction_payload() + contract = self.empty_extraction_payload() + for key in contract: + value = payload.get(key) + contract[key] = value if isinstance(value, dict) else {} + if key not in payload: + logger.info("Extracao sem secao '%s'; usando vazio.", key) + return contract + + def parse_json_object(self, text: str): + candidate = (text or "").strip() + if not candidate: + return None + if candidate.startswith("```"): + candidate = re.sub(r"^```(?:json)?\s*", "", candidate, flags=re.IGNORECASE) + candidate = re.sub(r"\s*```$", "", candidate) + try: + return json.loads(candidate) + except json.JSONDecodeError: + match = re.search(r"\{.*\}", candidate, flags=re.DOTALL) + if not match: + logger.warning("Extracao sem JSON valido no texto retornado.") + return None + try: + return json.loads(match.group(0)) + except json.JSONDecodeError: + logger.warning("Extracao com JSON invalido apos recorte.") + return None + + def normalize_text(self, text: str) -> str: + normalized = unicodedata.normalize("NFKD", text or "") + ascii_text = normalized.encode("ascii", "ignore").decode("ascii") + return ascii_text.lower() + + def normalize_plate(self, value) -> str | None: + text = str(value or "").strip().upper() + if not text: + return None + if re.fullmatch(r"[A-Z]{3}[0-9][A-Z0-9][0-9]{2}", text) or re.fullmatch(r"[A-Z]{3}[0-9]{4}", text): + return text + compact = re.sub(r"[^A-Z0-9]", "", text) + if re.fullmatch(r"[A-Z]{3}[0-9][A-Z0-9][0-9]{2}", compact) or re.fullmatch(r"[A-Z]{3}[0-9]{4}", compact): + return compact + return None + + def normalize_cpf(self, value) -> str | None: + digits = re.sub(r"\D", "", str(value or "")) + if len(digits) == 11: + return digits + return None + + def normalize_positive_number(self, value) -> float | None: + if value is None: + return None + if isinstance(value, (int, float)): + number = float(value) + return number if number > 0 else None + text = self.normalize_text(str(value)) + text = text.replace("r$", "").strip() + multiplier = 1000 if "mil" in text else 1 + text = text.replace("mil", "").strip() + digits = re.sub(r"[^0-9,.\s]", "", text) + if not digits: + return None + numeric = digits.replace(".", "").replace(" ", "").replace(",", ".") + try: + number = float(numeric) * multiplier + return number if number > 0 else None + except ValueError: + return None + + def normalize_vehicle_profile(self, value) -> list[str]: + if value is None: + return [] + allowed = {"suv", "sedan", "hatch", "pickup"} + items = value if isinstance(value, list) else [value] + normalized: list[str] = [] + for item in items: + marker = self.normalize_text(str(item)).strip() + if marker in allowed and marker not in normalized: + normalized.append(marker) + return normalized + + def normalize_bool(self, value) -> bool | None: + if isinstance(value, bool): + return value + lowered = self.normalize_text(str(value or "")).strip() + if lowered in {"sim", "true", "1", "yes"}: + return True + if lowered in {"nao", "false", "0", "no"}: + return False + return None + + def normalize_datetime_connector(self, text: str) -> str: + compact = " ".join(str(text or "").strip().split()) + lowered = compact.lower() + marker = " as " + if marker in lowered: + index = lowered.index(marker) + return f"{compact[:index]} {compact[index + len(marker):]}".strip() + return compact + + def try_parse_iso_datetime(self, text: str) -> datetime | None: + candidate = str(text or "").strip() + if not candidate: + return None + try: + return datetime.fromisoformat(candidate.replace("Z", "+00:00")) + except ValueError: + return None + + def try_parse_datetime_with_formats(self, text: str, formats: tuple[str, ...]) -> datetime | None: + candidate = str(text or "").strip() + if not candidate: + return None + for fmt in formats: + try: + return datetime.strptime(candidate, fmt) + except ValueError: + continue + return None + + def try_parse_review_absolute_datetime(self, text: str) -> datetime | None: + normalized = self.normalize_datetime_connector(text) + parsed = self.try_parse_iso_datetime(normalized) + if parsed is not None: + return parsed + + day_first_formats = ( + "%d/%m/%Y %H:%M", + "%d/%m/%Y %H:%M:%S", + "%d-%m-%Y %H:%M", + "%d-%m-%Y %H:%M:%S", + ) + year_first_formats = ( + "%Y/%m/%d %H:%M", + "%Y/%m/%d %H:%M:%S", + "%Y-%m-%d %H:%M", + "%Y-%m-%d %H:%M:%S", + ) + return self.try_parse_datetime_with_formats(normalized, day_first_formats + year_first_formats) + + def strip_token_edges(self, token: str) -> str: + cleaned = str(token or "").strip() + edge_chars = "[](){}<>,.;:!?\"'`" + while cleaned and cleaned[0] in edge_chars: + cleaned = cleaned[1:] + while cleaned and cleaned[-1] in edge_chars: + cleaned = cleaned[:-1] + return cleaned + + def extract_hhmm_from_text(self, text: str) -> str | None: + cleaned = self.normalize_datetime_connector(text) + for token in cleaned.split(): + normalized_token = self.strip_token_edges(token) + parts = normalized_token.split(":") + if len(parts) not in {2, 3}: + continue + if not all(part.isdigit() for part in parts): + continue + hour = int(parts[0]) + minute = int(parts[1]) + if 0 <= hour <= 23 and 0 <= minute <= 59: + return f"{hour:02d}:{minute:02d}" + return None + + def normalize_review_datetime_text(self, value) -> str | None: + text = str(value or "").strip() + if not text: + return None + + absolute_dt = self.try_parse_review_absolute_datetime(text) + if absolute_dt is not None: + return text + + normalized = self.normalize_text(text) + day_offset = None + if "amanha" in normalized: + day_offset = 1 + elif "hoje" in normalized: + day_offset = 0 + if day_offset is None: + return text + + time_text = self.extract_hhmm_from_text(normalized) + if not time_text: + return text + + hour_text, minute_text = time_text.split(":") + target_date = datetime.now() + timedelta(days=day_offset) + return f"{target_date.strftime('%d/%m/%Y')} {int(hour_text):02d}:{int(minute_text):02d}" + + def normalize_generic_fields(self, data) -> dict: + if not isinstance(data, dict): + return {} + extracted: dict = {} + plate = self.normalize_plate(data.get("placa")) + if plate: + extracted["placa"] = plate + cpf = self.normalize_cpf(data.get("cpf")) + if cpf: + extracted["cpf"] = cpf + budget = self.normalize_positive_number(data.get("orcamento_max")) + if budget: + extracted["orcamento_max"] = int(round(budget)) + profile = self.normalize_vehicle_profile(data.get("perfil_veiculo")) + if profile: + extracted["perfil_veiculo"] = profile + return extracted + + def normalize_review_fields(self, data) -> dict: + if not isinstance(data, dict): + return {} + extracted: dict = {} + plate = self.normalize_plate(data.get("placa")) + if plate: + extracted["placa"] = plate + date_time = self.normalize_review_datetime_text(data.get("data_hora")) + if date_time: + extracted["data_hora"] = date_time + model = str(data.get("modelo") or "").strip(" ,.;") + if model: + extracted["modelo"] = model.title() + year = self.normalize_positive_number(data.get("ano")) + if year: + year_int = int(round(year)) + if 1900 <= year_int <= 2100: + extracted["ano"] = year_int + km = self.normalize_positive_number(data.get("km")) + if km: + extracted["km"] = int(round(km)) + reviewed = self.normalize_bool(data.get("revisao_previa_concessionaria")) + if reviewed is not None: + extracted["revisao_previa_concessionaria"] = reviewed + return extracted + + def tokenize_text(self, text: str) -> list[str]: + return [token for token in str(text or "").split() if token] + + def clean_protocol_token(self, token: str) -> str: + cleaned = str(token or "").strip().upper() + edge_chars = "[](){}<>,.;:!?\"'`" + while cleaned and cleaned[0] in edge_chars: + cleaned = cleaned[1:] + while cleaned and cleaned[-1] in edge_chars: + cleaned = cleaned[:-1] + return cleaned + + def is_valid_protocol_suffix(self, value: str) -> bool: + if not value or len(value) < 4: + return False + return all(char.isalnum() for char in value) + + def normalize_review_protocol(self, value: str) -> str | None: + candidate = self.clean_protocol_token(value) + if not candidate.startswith("REV-"): + return None + parts = candidate.split("-") + if len(parts) != 3: + return None + prefix, date_part, suffix_part = parts + if prefix != "REV": + return None + if len(date_part) != 8 or not date_part.isdigit(): + return None + try: + datetime.strptime(date_part, "%Y%m%d") + except ValueError: + return None + if not self.is_valid_protocol_suffix(suffix_part): + return None + return f"{prefix}-{date_part}-{suffix_part}" + + def extract_review_protocol_from_text(self, text: str) -> str | None: + for token in self.tokenize_text(text): + normalized = self.normalize_review_protocol(token) + if normalized: + return normalized + return self.normalize_review_protocol(str(text or "")) + + def normalize_review_management_fields(self, data) -> dict: + if not isinstance(data, dict): + return {} + extracted: dict = {} + raw_protocol = data.get("protocolo") or data.get("numero_protocolo") or data.get("codigo") + protocol = self.extract_review_protocol_from_text(str(raw_protocol or "")) + if protocol: + extracted["protocolo"] = protocol + new_datetime = self.normalize_review_datetime_text(data.get("nova_data_hora")) + if new_datetime: + extracted["nova_data_hora"] = new_datetime + reason = str(data.get("motivo") or "").strip(" .;") + if reason: + extracted["motivo"] = reason + return extracted + + def normalize_order_fields(self, data) -> dict: + if not isinstance(data, dict): + return {} + extracted: dict = {} + cpf = self.normalize_cpf(data.get("cpf")) + if cpf: + extracted["cpf"] = cpf + value = self.normalize_positive_number(data.get("valor_veiculo")) + if value: + extracted["valor_veiculo"] = round(value, 2) + return extracted + + def normalize_cancel_order_fields(self, data) -> dict: + if not isinstance(data, dict): + return {} + extracted: dict = {} + order_number = str(data.get("numero_pedido") or "").strip().upper() + if order_number and re.fullmatch(r"PED-[A-Z0-9\\-]+", order_number): + extracted["numero_pedido"] = order_number + reason = str(data.get("motivo") or "").strip(" .;") + if reason: + extracted["motivo"] = reason + return extracted + + def normalize_intents(self, data) -> dict: + if not isinstance(data, dict): + data = {} + return { + "review_schedule": bool(self.normalize_bool(data.get("review_schedule"))), + "review_list": bool(self.normalize_bool(data.get("review_list"))), + "review_cancel": bool(self.normalize_bool(data.get("review_cancel"))), + "review_reschedule": bool(self.normalize_bool(data.get("review_reschedule"))), + "order_create": bool(self.normalize_bool(data.get("order_create"))), + "order_cancel": bool(self.normalize_bool(data.get("order_cancel"))), + } + + def has_useful_extraction(self, extraction: dict | None) -> bool: + if not isinstance(extraction, dict): + return False + intents = self.normalize_intents(extraction.get("intents")) + if any(intents.values()): + return True + return any( + bool(extraction.get(key)) + for key in ("generic_memory", "review_fields", "review_management_fields", "order_fields", "cancel_order_fields") + ) + + def has_operational_intent(self, extracted_entities: dict | None) -> bool: + if not isinstance(extracted_entities, dict): + return False + intents = self.normalize_intents(extracted_entities.get("intents")) + if any(intents.values()): + return True + return any( + bool(extracted_entities.get(key)) + for key in ("review_fields", "review_management_fields", "order_fields", "cancel_order_fields") + ) diff --git a/app/services/orchestration/message_planner.py b/app/services/orchestration/message_planner.py new file mode 100644 index 0000000..cb52ac2 --- /dev/null +++ b/app/services/orchestration/message_planner.py @@ -0,0 +1,160 @@ +import logging + +from app.services.ai.llm_service import LLMService +from app.services.orchestration.entity_normalizer import EntityNormalizer + + +logger = logging.getLogger(__name__) + + +class MessagePlanner: + def __init__(self, llm: LLMService, normalizer: EntityNormalizer): + self.llm = llm + self.normalizer = normalizer + + async def extract_message_plan(self, message: str, user_id: int | None) -> dict: + prompt = ( + "Analise a mensagem e retorne APENAS JSON valido com roteamento e entidades por pedido.\n" + "Sem markdown e sem texto extra.\n\n" + "Formato:\n" + "{\n" + ' "orders": [\n' + " {\n" + ' "domain": "review|sales|general",\n' + ' "message": "trecho literal do pedido",\n' + ' "entities": {\n' + ' "generic_memory": {"placa": null, "cpf": null, "orcamento_max": null, "perfil_veiculo": []},\n' + ' "review_fields": {"placa": null, "data_hora": null, "modelo": null, "ano": null, "km": null, "revisao_previa_concessionaria": null},\n' + ' "review_management_fields": {"protocolo": null, "nova_data_hora": null, "motivo": null},\n' + ' "order_fields": {"cpf": null, "valor_veiculo": null},\n' + ' "cancel_order_fields": {"numero_pedido": null, "motivo": null},\n' + ' "intents": {"review_schedule": false, "review_list": false, "review_cancel": false, "review_reschedule": false, "order_create": false, "order_cancel": false}\n' + " }\n" + " }\n" + " ]\n" + "}\n\n" + "Regras:\n" + "- Se houver mais de um pedido operacional, separe em itens distintos em ordem de aparicao.\n" + "- Se nao houver pedido operacional, use domain='general' com a mensagem inteira.\n" + "- Mantenha cada message curta e fiel ao texto do usuario.\n\n" + f"Contexto: user_id={user_id if user_id is not None else 'anonimo'}\n" + f"Mensagem do usuario: {message}" + ) + default = self.normalizer.empty_message_plan(message=message) + try: + result = await self.llm.generate_response(message=prompt, tools=[]) + text = (result.get("response") or "").strip() + payload = self.normalizer.parse_json_object(text) + if not isinstance(payload, dict): + logger.warning("Plano de mensagem invalido (nao JSON objeto). user_id=%s", user_id) + return default + return self.normalizer.coerce_message_plan(payload=payload, message=message) + except Exception: + logger.exception("Falha ao extrair plano da mensagem com LLM. user_id=%s", user_id) + return default + + async def extract_routing(self, message: str, user_id: int | None) -> dict: + plan = await self.extract_message_plan(message=message, user_id=user_id) + return { + "orders": [ + { + "domain": item.get("domain", "general"), + "message": item.get("message", ""), + } + for item in plan.get("orders", []) + ] + } + + async def extract_entities(self, message: str, user_id: int | None) -> dict: + user_context = f"user_id={user_id}" if user_id is not None else "user_id=anonimo" + prompt = ( + "Extraia entidades da mensagem do usuario e retorne APENAS JSON valido.\n" + "Nao use markdown, nao adicione texto antes/depois, nao invente dados ausentes.\n" + "Se nao houver valor, use null ou lista vazia.\n\n" + "Formato obrigatorio:\n" + "{\n" + ' "generic_memory": {\n' + ' "placa": null,\n' + ' "cpf": null,\n' + ' "orcamento_max": null,\n' + ' "perfil_veiculo": []\n' + " },\n" + ' "review_fields": {\n' + ' "placa": null,\n' + ' "data_hora": null,\n' + ' "modelo": null,\n' + ' "ano": null,\n' + ' "km": null,\n' + ' "revisao_previa_concessionaria": null\n' + " },\n" + ' "review_management_fields": {\n' + ' "protocolo": null,\n' + ' "nova_data_hora": null,\n' + ' "motivo": null\n' + " },\n" + ' "order_fields": {\n' + ' "cpf": null,\n' + ' "valor_veiculo": null\n' + " },\n" + ' "cancel_order_fields": {\n' + ' "numero_pedido": null,\n' + ' "motivo": null\n' + " },\n" + ' "intents": {\n' + ' "review_schedule": false,\n' + ' "review_list": false,\n' + ' "review_cancel": false,\n' + ' "review_reschedule": false,\n' + ' "order_create": false,\n' + ' "order_cancel": false\n' + " }\n" + "}\n\n" + f"Contexto: {user_context}\n" + f"Mensagem do usuario: {message}" + ) + + default = self.normalizer.empty_extraction_payload() + try: + result = await self.llm.generate_response(message=prompt, tools=[]) + text = (result.get("response") or "").strip() + if not text: + logger.warning("Extracao vazia do LLM. user_id=%s", user_id) + return default + payload = self.normalizer.parse_json_object(text) + if not isinstance(payload, dict): + logger.warning("Extracao invalida (nao JSON objeto). user_id=%s", user_id) + return default + return self.normalize_extraction_payload(payload) + except Exception: + logger.exception("Falha ao extrair entidades com LLM. user_id=%s", user_id) + return default + + def resolve_entities_for_message_plan(self, message_plan: dict, routed_message: str) -> dict: + default = self.normalizer.empty_extraction_payload() + if not isinstance(message_plan, dict): + return default + + target = (routed_message or "").strip() + raw_orders = message_plan.get("orders") + if not isinstance(raw_orders, list): + return default + + for item in raw_orders: + if not isinstance(item, dict): + continue + segment = str(item.get("message") or "").strip() + if segment != target: + continue + return self.normalize_extraction_payload(item.get("entities")) + return default + + def normalize_extraction_payload(self, payload) -> dict: + coerced = self.normalizer.coerce_extraction_contract(payload) + return { + "generic_memory": self.normalizer.normalize_generic_fields(coerced.get("generic_memory")), + "review_fields": self.normalizer.normalize_review_fields(coerced.get("review_fields")), + "review_management_fields": self.normalizer.normalize_review_management_fields(coerced.get("review_management_fields")), + "order_fields": self.normalizer.normalize_order_fields(coerced.get("order_fields")), + "cancel_order_fields": self.normalizer.normalize_cancel_order_fields(coerced.get("cancel_order_fields")), + "intents": self.normalizer.normalize_intents(coerced.get("intents")), + } diff --git a/app/services/orchestration/orquestrador_service.py b/app/services/orchestration/orquestrador_service.py index a78826c..b9906b8 100644 --- a/app/services/orchestration/orquestrador_service.py +++ b/app/services/orchestration/orquestrador_service.py @@ -1,22 +1,20 @@ -import re -import json import logging -import unicodedata from datetime import datetime, timedelta from fastapi import HTTPException from sqlalchemy.orm import Session from app.services.orchestration.orchestrator_config import ( - DETERMINISTIC_RESPONSE_TOOLS, LOW_VALUE_RESPONSES, ORCHESTRATION_CONTROL_TOOLS, - PENDING_ORDER_SELECTION_TTL_MINUTES, PENDING_REVIEW_TTL_MINUTES, USER_CONTEXT_TTL_MINUTES, ) -from app.services.orchestration.conversation_state_store import ConversationStateStore from app.services.ai.llm_service import LLMService +from app.services.orchestration.conversation_policy import ConversationPolicy +from app.services.orchestration.conversation_state_store import ConversationStateStore +from app.services.orchestration.entity_normalizer import EntityNormalizer +from app.services.orchestration.message_planner import MessagePlanner from app.services.flows.order_flow import OrderFlowMixin from app.services.orchestration.prompt_builders import ( build_force_tool_prompt, @@ -24,19 +22,24 @@ from app.services.orchestration.prompt_builders import ( build_router_prompt, ) from app.services.flows.review_flow import ReviewFlowMixin -from app.services.orchestration.response_formatter import fallback_format_tool_result +from app.services.orchestration.tool_executor import ToolExecutor from app.services.tools.tool_registry import ToolRegistry logger = logging.getLogger(__name__) class OrquestradorService(ReviewFlowMixin, OrderFlowMixin): - state = ConversationStateStore() + default_state_repository = ConversationStateStore() def __init__(self, db: Session): """Inicializa servicos de LLM e registro de tools para a sessao atual.""" + self.state = self.default_state_repository self.llm = LLMService() + self.normalizer = EntityNormalizer() + self.planner = MessagePlanner(llm=self.llm, normalizer=self.normalizer) self.registry = ToolRegistry(db, extra_handlers=self._build_orchestration_tool_handlers()) + self.tool_executor = ToolExecutor(registry=self.registry) + self.policy = ConversationPolicy(service=self) def _build_orchestration_tool_handlers(self) -> dict: return { @@ -202,7 +205,7 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin): arguments = llm_result["tool_call"]["arguments"] try: - tool_result = await self.registry.execute( + tool_result = await self.tool_executor.execute( tool_name, arguments, user_id=user_id, @@ -269,7 +272,7 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin): tool_name = tool_call.get("name") if tool_name in ORCHESTRATION_CONTROL_TOOLS: try: - tool_result = await self.registry.execute( + tool_result = await self.tool_executor.execute( tool_name, tool_call.get("arguments") or {}, user_id=user_id, @@ -305,7 +308,7 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin): return None try: - tool_result = await self.registry.execute( + tool_result = await self.tool_executor.execute( forced_tool_name, forced_tool_call.get("arguments") or {}, user_id=user_id, @@ -506,593 +509,112 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin): return dict(shared) def _empty_extraction_payload(self) -> dict: - return { - "generic_memory": {}, - "review_fields": {}, - "review_management_fields": {}, - "order_fields": {}, - "cancel_order_fields": {}, - "intents": {}, - } + return self.normalizer.empty_extraction_payload() def _empty_message_plan(self, message: str) -> dict: - return { - "orders": [ - { - "domain": "general", - "message": (message or "").strip(), - "entities": self._empty_extraction_payload(), - } - ] - } + return self.normalizer.empty_message_plan(message) def _coerce_message_plan(self, payload, message: str) -> dict: - default = self._empty_message_plan(message=message) - if not isinstance(payload, dict): - return default - - raw_orders = payload.get("orders") - if not isinstance(raw_orders, list): - return default - - normalized_orders: list[dict] = [] - for item in raw_orders: - if not isinstance(item, dict): - continue - domain = str(item.get("domain") or "general").strip().lower() - if domain not in {"review", "sales", "general"}: - domain = "general" - segment = str(item.get("message") or "").strip() - if not segment: - continue - normalized_orders.append( - { - "domain": domain, - "message": segment, - "entities": self._coerce_extraction_contract(item.get("entities")), - } - ) - - if not normalized_orders: - return default - return {"orders": normalized_orders} + return self.normalizer.coerce_message_plan(payload=payload, message=message) def _coerce_extraction_contract(self, payload) -> dict: - if not isinstance(payload, dict): - return self._empty_extraction_payload() - contract = self._empty_extraction_payload() - for key in contract: - value = payload.get(key) - contract[key] = value if isinstance(value, dict) else {} - if key not in payload: - logger.info("Extracao sem secao '%s'; usando vazio.", key) - return contract + return self.normalizer.coerce_extraction_contract(payload) async def _extract_message_plan_with_llm(self, message: str, user_id: int | None) -> dict: - prompt = ( - "Analise a mensagem e retorne APENAS JSON valido com roteamento e entidades por pedido.\n" - "Sem markdown e sem texto extra.\n\n" - "Formato:\n" - "{\n" - ' "orders": [\n' - " {\n" - ' "domain": "review|sales|general",\n' - ' "message": "trecho literal do pedido",\n' - ' "entities": {\n' - ' "generic_memory": {"placa": null, "cpf": null, "orcamento_max": null, "perfil_veiculo": []},\n' - ' "review_fields": {"placa": null, "data_hora": null, "modelo": null, "ano": null, "km": null, "revisao_previa_concessionaria": null},\n' - ' "review_management_fields": {"protocolo": null, "nova_data_hora": null, "motivo": null},\n' - ' "order_fields": {"cpf": null, "valor_veiculo": null},\n' - ' "cancel_order_fields": {"numero_pedido": null, "motivo": null},\n' - ' "intents": {"review_schedule": false, "review_list": false, "review_cancel": false, "review_reschedule": false, "order_create": false, "order_cancel": false}\n' - " }\n" - " }\n" - " ]\n" - "}\n\n" - "Regras:\n" - "- Se houver mais de um pedido operacional, separe em itens distintos em ordem de aparicao.\n" - "- Se nao houver pedido operacional, use domain='general' com a mensagem inteira.\n" - "- Mantenha cada message curta e fiel ao texto do usuario.\n\n" - f"Contexto: user_id={user_id if user_id is not None else 'anonimo'}\n" - f"Mensagem do usuario: {message}" - ) - default = self._empty_message_plan(message=message) - try: - result = await self.llm.generate_response(message=prompt, tools=[]) - text = (result.get("response") or "").strip() - payload = self._parse_json_object(text) - if not isinstance(payload, dict): - logger.warning("Plano de mensagem invalido (nao JSON objeto). user_id=%s", user_id) - return default - return self._coerce_message_plan(payload=payload, message=message) - except Exception: - logger.exception("Falha ao extrair plano da mensagem com LLM. user_id=%s", user_id) - return default + return await self.planner.extract_message_plan(message=message, user_id=user_id) async def _extract_routing_with_llm(self, message: str, user_id: int | None) -> dict: - plan = await self._extract_message_plan_with_llm(message=message, user_id=user_id) - return { - "orders": [ - { - "domain": item.get("domain", "general"), - "message": item.get("message", ""), - } - for item in plan.get("orders", []) - ] - } + return await self.planner.extract_routing(message=message, user_id=user_id) async def _extract_entities_with_llm(self, message: str, user_id: int | None) -> dict: - user_context = f"user_id={user_id}" if user_id is not None else "user_id=anonimo" - prompt = ( - "Extraia entidades da mensagem do usuario e retorne APENAS JSON valido.\n" - "Nao use markdown, nao adicione texto antes/depois, nao invente dados ausentes.\n" - "Se nao houver valor, use null ou lista vazia.\n\n" - "Formato obrigatorio:\n" - "{\n" - ' "generic_memory": {\n' - ' "placa": null,\n' - ' "cpf": null,\n' - ' "orcamento_max": null,\n' - ' "perfil_veiculo": []\n' - " },\n" - ' "review_fields": {\n' - ' "placa": null,\n' - ' "data_hora": null,\n' - ' "modelo": null,\n' - ' "ano": null,\n' - ' "km": null,\n' - ' "revisao_previa_concessionaria": null\n' - " },\n" - ' "review_management_fields": {\n' - ' "protocolo": null,\n' - ' "nova_data_hora": null,\n' - ' "motivo": null\n' - " },\n" - ' "order_fields": {\n' - ' "cpf": null,\n' - ' "valor_veiculo": null\n' - " },\n" - ' "cancel_order_fields": {\n' - ' "numero_pedido": null,\n' - ' "motivo": null\n' - " },\n" - ' "intents": {\n' - ' "review_schedule": false,\n' - ' "review_list": false,\n' - ' "review_cancel": false,\n' - ' "review_reschedule": false,\n' - ' "order_create": false,\n' - ' "order_cancel": false\n' - " }\n" - "}\n\n" - f"Contexto: {user_context}\n" - f"Mensagem do usuario: {message}" - ) - - default = self._empty_extraction_payload() - try: - result = await self.llm.generate_response(message=prompt, tools=[]) - text = (result.get("response") or "").strip() - if not text: - logger.warning("Extracao vazia do LLM. user_id=%s", user_id) - return default - payload = self._parse_json_object(text) - if not isinstance(payload, dict): - logger.warning("Extracao invalida (nao JSON objeto). user_id=%s", user_id) - return default - coerced = self._coerce_extraction_contract(payload) - return { - "generic_memory": self._normalize_generic_fields(coerced.get("generic_memory")), - "review_fields": self._normalize_review_fields(coerced.get("review_fields")), - "review_management_fields": self._normalize_review_management_fields(coerced.get("review_management_fields")), - "order_fields": self._normalize_order_fields(coerced.get("order_fields")), - "cancel_order_fields": self._normalize_cancel_order_fields(coerced.get("cancel_order_fields")), - "intents": self._normalize_intents(coerced.get("intents")), - } - except Exception: - logger.exception("Falha ao extrair entidades com LLM. user_id=%s", user_id) - return default + return await self.planner.extract_entities(message=message, user_id=user_id) def _resolve_entities_for_message_plan(self, message_plan: dict, routed_message: str) -> dict: - default = self._empty_extraction_payload() - if not isinstance(message_plan, dict): - return default - - target = (routed_message or "").strip() - raw_orders = message_plan.get("orders") - if not isinstance(raw_orders, list): - return default - - for item in raw_orders: - if not isinstance(item, dict): - continue - segment = str(item.get("message") or "").strip() - if segment != target: - continue - entities = self._coerce_extraction_contract(item.get("entities")) - return { - "generic_memory": self._normalize_generic_fields(entities.get("generic_memory")), - "review_fields": self._normalize_review_fields(entities.get("review_fields")), - "review_management_fields": self._normalize_review_management_fields(entities.get("review_management_fields")), - "order_fields": self._normalize_order_fields(entities.get("order_fields")), - "cancel_order_fields": self._normalize_cancel_order_fields(entities.get("cancel_order_fields")), - "intents": self._normalize_intents(entities.get("intents")), - } - return default + return self.planner.resolve_entities_for_message_plan(message_plan=message_plan, routed_message=routed_message) def _has_useful_extraction(self, extraction: dict | None) -> bool: - if not isinstance(extraction, dict): - return False - intents = self._normalize_intents(extraction.get("intents")) - if any(intents.values()): - return True - return any( - bool(extraction.get(key)) - for key in ("generic_memory", "review_fields", "review_management_fields", "order_fields", "cancel_order_fields") - ) + return self.normalizer.has_useful_extraction(extraction) def _parse_json_object(self, text: str): - candidate = (text or "").strip() - if not candidate: - return None - if candidate.startswith("```"): - candidate = re.sub(r"^```(?:json)?\s*", "", candidate, flags=re.IGNORECASE) - candidate = re.sub(r"\s*```$", "", candidate) - try: - return json.loads(candidate) - except json.JSONDecodeError: - match = re.search(r"\{.*\}", candidate, flags=re.DOTALL) - if not match: - logger.warning("Extracao sem JSON valido no texto retornado.") - return None - try: - return json.loads(match.group(0)) - except json.JSONDecodeError: - logger.warning("Extracao com JSON invalido apos recorte.") - return None + return self.normalizer.parse_json_object(text) def _normalize_plate(self, value) -> str | None: - text = str(value or "").strip().upper() - if not text: - return None - if re.fullmatch(r"[A-Z]{3}[0-9][A-Z0-9][0-9]{2}", text) or re.fullmatch(r"[A-Z]{3}[0-9]{4}", text): - return text - compact = re.sub(r"[^A-Z0-9]", "", text) - if re.fullmatch(r"[A-Z]{3}[0-9][A-Z0-9][0-9]{2}", compact) or re.fullmatch(r"[A-Z]{3}[0-9]{4}", compact): - return compact - return None + return self.normalizer.normalize_plate(value) def _normalize_cpf(self, value) -> str | None: - digits = re.sub(r"\D", "", str(value or "")) - if len(digits) == 11: - return digits - return None + return self.normalizer.normalize_cpf(value) def _normalize_positive_number(self, value) -> float | None: - if value is None: - return None - if isinstance(value, (int, float)): - number = float(value) - return number if number > 0 else None - text = self._normalize_text(str(value)) - text = text.replace("r$", "").strip() - multiplier = 1000 if "mil" in text else 1 - text = text.replace("mil", "").strip() - digits = re.sub(r"[^0-9,.\s]", "", text) - if not digits: - return None - numeric = digits.replace(".", "").replace(" ", "").replace(",", ".") - try: - number = float(numeric) * multiplier - return number if number > 0 else None - except ValueError: - return None + return self.normalizer.normalize_positive_number(value) def _normalize_vehicle_profile(self, value) -> list[str]: - if value is None: - return [] - allowed = {"suv", "sedan", "hatch", "pickup"} - items = value if isinstance(value, list) else [value] - normalized: list[str] = [] - for item in items: - marker = self._normalize_text(str(item)).strip() - if marker in allowed and marker not in normalized: - normalized.append(marker) - return normalized + return self.normalizer.normalize_vehicle_profile(value) def _normalize_bool(self, value) -> bool | None: - if isinstance(value, bool): - return value - lowered = self._normalize_text(str(value or "")).strip() - if lowered in {"sim", "true", "1", "yes"}: - return True - if lowered in {"nao", "false", "0", "no"}: - return False - return None + return self.normalizer.normalize_bool(value) def _normalize_datetime_connector(self, text: str) -> str: - compact = " ".join(str(text or "").strip().split()) - lowered = compact.lower() - marker = " as " - if marker in lowered: - index = lowered.index(marker) - return f"{compact[:index]} {compact[index + len(marker):]}".strip() - return compact + return self.normalizer.normalize_datetime_connector(text) def _try_parse_iso_datetime(self, text: str) -> datetime | None: - candidate = str(text or "").strip() - if not candidate: - return None - try: - return datetime.fromisoformat(candidate.replace("Z", "+00:00")) - except ValueError: - return None + return self.normalizer.try_parse_iso_datetime(text) def _try_parse_datetime_with_formats(self, text: str, formats: tuple[str, ...]) -> datetime | None: - candidate = str(text or "").strip() - if not candidate: - return None - for fmt in formats: - try: - return datetime.strptime(candidate, fmt) - except ValueError: - continue - return None + return self.normalizer.try_parse_datetime_with_formats(text, formats) def _try_parse_review_absolute_datetime(self, text: str) -> datetime | None: - normalized = self._normalize_datetime_connector(text) - parsed = self._try_parse_iso_datetime(normalized) - if parsed is not None: - return parsed - - day_first_formats = ( - "%d/%m/%Y %H:%M", - "%d/%m/%Y %H:%M:%S", - "%d-%m-%Y %H:%M", - "%d-%m-%Y %H:%M:%S", - ) - year_first_formats = ( - "%Y/%m/%d %H:%M", - "%Y/%m/%d %H:%M:%S", - "%Y-%m-%d %H:%M", - "%Y-%m-%d %H:%M:%S", - ) - return self._try_parse_datetime_with_formats(normalized, day_first_formats + year_first_formats) + return self.normalizer.try_parse_review_absolute_datetime(text) def _extract_hhmm_from_text(self, text: str) -> str | None: - cleaned = self._normalize_datetime_connector(text) - for token in cleaned.split(): - normalized_token = self._strip_token_edges(token) - parts = normalized_token.split(":") - if len(parts) not in {2, 3}: - continue - if not all(part.isdigit() for part in parts): - continue - hour = int(parts[0]) - minute = int(parts[1]) - if 0 <= hour <= 23 and 0 <= minute <= 59: - return f"{hour:02d}:{minute:02d}" - return None + return self.normalizer.extract_hhmm_from_text(text) def _normalize_review_datetime_text(self, value) -> str | None: - text = str(value or "").strip() - if not text: - return None - - absolute_dt = self._try_parse_review_absolute_datetime(text) - if absolute_dt is not None: - return text - - normalized = self._normalize_text(text) - day_offset = None - if "amanha" in normalized: - day_offset = 1 - elif "hoje" in normalized: - day_offset = 0 - if day_offset is None: - return text - - time_text = self._extract_hhmm_from_text(normalized) - if not time_text: - return text - - hour_text, minute_text = time_text.split(":") - hour = int(hour_text) - minute = int(minute_text) - target_date = datetime.now() + timedelta(days=day_offset) - return f"{target_date.strftime('%d/%m/%Y')} {hour:02d}:{minute:02d}" + return self.normalizer.normalize_review_datetime_text(value) def _normalize_generic_fields(self, data) -> dict: - if not isinstance(data, dict): - return {} - extracted: dict = {} - plate = self._normalize_plate(data.get("placa")) - if plate: - extracted["placa"] = plate - cpf = self._normalize_cpf(data.get("cpf")) - if cpf: - extracted["cpf"] = cpf - budget = self._normalize_positive_number(data.get("orcamento_max")) - if budget: - extracted["orcamento_max"] = int(round(budget)) - profile = self._normalize_vehicle_profile(data.get("perfil_veiculo")) - if profile: - extracted["perfil_veiculo"] = profile - return extracted + return self.normalizer.normalize_generic_fields(data) def _normalize_review_fields(self, data) -> dict: - if not isinstance(data, dict): - return {} - extracted: dict = {} - plate = self._normalize_plate(data.get("placa")) - if plate: - extracted["placa"] = plate - date_time = self._normalize_review_datetime_text(data.get("data_hora")) - if date_time: - extracted["data_hora"] = date_time - model = str(data.get("modelo") or "").strip(" ,.;") - if model: - extracted["modelo"] = model.title() - year = self._normalize_positive_number(data.get("ano")) - if year: - year_int = int(round(year)) - if 1900 <= year_int <= 2100: - extracted["ano"] = year_int - km = self._normalize_positive_number(data.get("km")) - if km: - extracted["km"] = int(round(km)) - reviewed = self._normalize_bool(data.get("revisao_previa_concessionaria")) - if reviewed is not None: - extracted["revisao_previa_concessionaria"] = reviewed - return extracted + return self.normalizer.normalize_review_fields(data) def _tokenize_text(self, text: str) -> list[str]: - return [token for token in str(text or "").split() if token] + return self.normalizer.tokenize_text(text) def _clean_protocol_token(self, token: str) -> str: - cleaned = str(token or "").strip().upper() - edge_chars = "[](){}<>,.;:!?\"'`" - while cleaned and cleaned[0] in edge_chars: - cleaned = cleaned[1:] - while cleaned and cleaned[-1] in edge_chars: - cleaned = cleaned[:-1] - return cleaned + return self.normalizer.clean_protocol_token(token) def _strip_token_edges(self, token: str) -> str: - cleaned = str(token or "").strip() - edge_chars = "[](){}<>,.;:!?\"'`" - while cleaned and cleaned[0] in edge_chars: - cleaned = cleaned[1:] - while cleaned and cleaned[-1] in edge_chars: - cleaned = cleaned[:-1] - return cleaned + return self.normalizer.strip_token_edges(token) def _is_valid_protocol_suffix(self, value: str) -> bool: - if not value or len(value) < 4: - return False - return all(char.isalnum() for char in value) + return self.normalizer.is_valid_protocol_suffix(value) def _normalize_review_protocol(self, value: str) -> str | None: - candidate = self._clean_protocol_token(value) - if not candidate.startswith("REV-"): - return None - - parts = candidate.split("-") - if len(parts) != 3: - return None - - prefix, date_part, suffix_part = parts - if prefix != "REV": - return None - if len(date_part) != 8 or not date_part.isdigit(): - return None - try: - datetime.strptime(date_part, "%Y%m%d") - except ValueError: - return None - if not self._is_valid_protocol_suffix(suffix_part): - return None - return f"{prefix}-{date_part}-{suffix_part}" + return self.normalizer.normalize_review_protocol(value) def _extract_review_protocol_from_text(self, text: str) -> str | None: - for token in self._tokenize_text(text): - normalized = self._normalize_review_protocol(token) - if normalized: - return normalized - normalized_full = self._normalize_review_protocol(str(text or "")) - if normalized_full: - return normalized_full - return None + return self.normalizer.extract_review_protocol_from_text(text) def _normalize_review_management_fields(self, data) -> dict: - if not isinstance(data, dict): - return {} - extracted: dict = {} - - raw_protocol = ( - data.get("protocolo") - or data.get("numero_protocolo") - or data.get("codigo") - ) - protocol = self._extract_review_protocol_from_text(str(raw_protocol or "")) - if protocol: - extracted["protocolo"] = protocol - - new_datetime = self._normalize_review_datetime_text(data.get("nova_data_hora")) - if new_datetime: - extracted["nova_data_hora"] = new_datetime - - reason = str(data.get("motivo") or "").strip(" .;") - if reason: - extracted["motivo"] = reason - return extracted + return self.normalizer.normalize_review_management_fields(data) def _normalize_order_fields(self, data) -> dict: - if not isinstance(data, dict): - return {} - extracted: dict = {} - cpf = self._normalize_cpf(data.get("cpf")) - if cpf: - extracted["cpf"] = cpf - value = self._normalize_positive_number(data.get("valor_veiculo")) - if value: - extracted["valor_veiculo"] = round(value, 2) - return extracted + return self.normalizer.normalize_order_fields(data) def _normalize_cancel_order_fields(self, data) -> dict: - if not isinstance(data, dict): - return {} - extracted: dict = {} - order_number = str(data.get("numero_pedido") or "").strip().upper() - if order_number and re.fullmatch(r"PED-[A-Z0-9\-]+", order_number): - extracted["numero_pedido"] = order_number - reason = str(data.get("motivo") or "").strip(" .;") - if reason: - extracted["motivo"] = reason - return extracted + return self.normalizer.normalize_cancel_order_fields(data) def _normalize_intents(self, data) -> dict: - if not isinstance(data, dict): - data = {} - return { - "review_schedule": bool(self._normalize_bool(data.get("review_schedule"))), - "review_list": bool(self._normalize_bool(data.get("review_list"))), - "review_cancel": bool(self._normalize_bool(data.get("review_cancel"))), - "review_reschedule": bool(self._normalize_bool(data.get("review_reschedule"))), - "order_create": bool(self._normalize_bool(data.get("order_create"))), - "order_cancel": bool(self._normalize_bool(data.get("order_cancel"))), - } + return self.normalizer.normalize_intents(data) def _has_operational_intent(self, extracted_entities: dict | None) -> bool: - if not isinstance(extracted_entities, dict): - return False - intents = self._normalize_intents(extracted_entities.get("intents")) - if any(intents.values()): - return True - return any( - bool(extracted_entities.get(key)) - for key in ("review_fields", "review_management_fields", "order_fields", "cancel_order_fields") - ) + return self.normalizer.has_operational_intent(extracted_entities) def _try_prefill_review_fields_from_memory(self, user_id: int | None, payload: dict) -> None: - if user_id is None: - return - context = self._get_user_context(user_id) - if not context: - return - memory = context.get("generic_memory", {}) - if payload.get("placa") is None: - plate = self._normalize_plate(memory.get("placa")) - if plate: - payload["placa"] = plate + self.policy.try_prefill_review_fields_from_memory(user_id=user_id, payload=payload) def _queue_order(self, user_id: int | None, domain: str, order_message: str) -> None: - self._queue_order_with_memory_seed( - user_id=user_id, - domain=domain, - order_message=order_message, - memory_seed=self._new_tab_memory(user_id=user_id), - ) + self.policy.queue_order(user_id=user_id, domain=domain, order_message=order_message) def _queue_order_with_memory_seed( self, @@ -1101,29 +623,18 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin): order_message: str, memory_seed: dict | None = None, ) -> None: - context = self._get_user_context(user_id) - if not context: - return - if domain == "general": - return - queue = context.setdefault("order_queue", []) - queue.append( - { - "domain": domain, - "message": (order_message or "").strip(), - "memory_seed": dict(memory_seed or self._new_tab_memory(user_id=user_id)), - "created_at": datetime.utcnow().isoformat(), - } + self.policy.queue_order_with_memory_seed( + user_id=user_id, + domain=domain, + order_message=order_message, + memory_seed=memory_seed, ) + def _build_order_memory_seed(self, user_id: int | None, order: dict | None = None) -> dict: + return self.policy.build_order_memory_seed(user_id=user_id, order=order) + def _pop_next_order(self, user_id: int | None) -> dict | None: - context = self._get_user_context(user_id) - if not context: - return None - queue = context.setdefault("order_queue", []) - if not queue: - return None - return queue.pop(0) + return self.policy.pop_next_order(user_id) def _prepare_message_for_single_order( self, @@ -1131,466 +642,74 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin): user_id: int | None, routing_plan: dict | None = None, ) -> tuple[str, str | None, str | None]: - context = self._get_user_context(user_id) - if not context: - return message, None, None - - queue_notice = None - active_domain = context.get("active_domain", "general") - - orders_raw = (routing_plan or {}).get("orders") if isinstance(routing_plan, dict) else None - extracted_orders: list[dict] = [] - if isinstance(orders_raw, list): - for item in orders_raw: - if not isinstance(item, dict): - continue - domain = str(item.get("domain") or "general").strip().lower() - if domain not in {"review", "sales", "general"}: - domain = "general" - segment = str(item.get("message") or "").strip() - if segment: - extracted_orders.append({"domain": domain, "message": segment}) - if not extracted_orders: - extracted_orders = [{"domain": "general", "message": (message or "").strip()}] - - if ( - len(extracted_orders) == 2 - and all(order["domain"] != "general" for order in extracted_orders) - and not self._has_open_flow(user_id=user_id, domain=active_domain) - ): - self._store_pending_order_selection(user_id=user_id, orders=extracted_orders) - return message, None, self._render_order_selection_prompt(extracted_orders) - - if len(extracted_orders) <= 1: - inferred = extracted_orders[0]["domain"] - if ( - inferred != "general" - and inferred != active_domain - and self._has_open_flow(user_id=user_id, domain=active_domain) - ): - self._queue_order(user_id=user_id, domain=inferred, order_message=message) - queue_hint = self._render_queue_notice(1) - return ( - message, - None, - ( - f"{self._render_open_flow_prompt(user_id=user_id, domain=active_domain)}\n{queue_hint}" - if queue_hint - else self._render_open_flow_prompt(user_id=user_id, domain=active_domain) - ), - ) - return message, None, None - - if self._has_open_flow(user_id=user_id, domain=active_domain): - queued_count = 0 - for queued in extracted_orders: - if queued["domain"] != active_domain: - self._queue_order(user_id=user_id, domain=queued["domain"], order_message=queued["message"]) - queued_count += 1 - queue_hint = self._render_queue_notice(queued_count) - return ( - message, - None, - ( - f"{self._render_open_flow_prompt(user_id=user_id, domain=active_domain)}\n{queue_hint}" - if queue_hint - else self._render_open_flow_prompt(user_id=user_id, domain=active_domain) - ), - ) - - first = extracted_orders[0] - queued_count = 0 - for queued in extracted_orders[1:]: - self._queue_order(user_id=user_id, domain=queued["domain"], order_message=queued["message"]) - queued_count += 1 - context["active_domain"] = first["domain"] - - queue_notice = self._render_queue_notice(queued_count) - return first["message"], queue_notice, None + return self.policy.prepare_message_for_single_order(message=message, user_id=user_id, routing_plan=routing_plan) def _compose_order_aware_response(self, response: str, user_id: int | None, queue_notice: str | None = None) -> str: - lines = [] - if queue_notice: - lines.append(queue_notice) - lines.append(response) - return "\n".join(lines) + return self.policy.compose_order_aware_response(response=response, queue_notice=queue_notice) def _store_pending_order_selection(self, user_id: int | None, orders: list[dict]) -> None: - context = self._get_user_context(user_id) - if not context: - return - context["pending_order_selection"] = { - "orders": [ - { - "domain": order["domain"], - "message": order["message"], - "memory_seed": self._new_tab_memory(user_id=user_id), - } - for order in orders[:2] - ], - "expires_at": datetime.utcnow() + timedelta(minutes=PENDING_ORDER_SELECTION_TTL_MINUTES), - } + self.policy.store_pending_order_selection(user_id=user_id, orders=orders) def _render_order_selection_prompt(self, orders: list[dict]) -> str: - if len(orders) < 2: - return "Qual das acoes voce quer iniciar primeiro?" - first_label = self._describe_order_selection_option(orders[0]) - second_label = self._describe_order_selection_option(orders[1]) - return ( - "Identifiquei duas acoes na sua mensagem:\n" - f"1. {first_label}\n" - f"2. {second_label}\n" - "Qual delas voce quer iniciar primeiro? Se for indiferente, eu escolho." - ) + return self.policy.render_order_selection_prompt(orders) def _describe_order_selection_option(self, order: dict) -> str: - domain = str(order.get("domain") or "general") - message = str(order.get("message") or "").strip() - domain_prefix = { - "review": "Revisao", - "sales": "Venda", - "general": "Atendimento", - }.get(domain, "Atendimento") - return f"{domain_prefix}: {message}" + return self.policy.describe_order_selection_option(order) def _contains_any_term(self, text: str, terms: set[str]) -> bool: - return any(term in text for term in terms) + return self.policy.contains_any_term(text, terms) def _strip_choice_message(self, text: str) -> str: - cleaned = (text or "").strip() - trailing_chars = ".!?,;:" - while cleaned and cleaned[-1] in trailing_chars: - cleaned = cleaned[:-1].rstrip() - return cleaned + return self.policy.strip_choice_message(text) def _remove_order_selection_reset_prefix(self, message: str) -> str: - raw = (message or "").strip() - normalized = self._normalize_text(raw) - prefixes = ( - "esqueca tudo agora", - "esqueca tudo", - "esquece tudo agora", - "esquece tudo", - ) - for prefix in prefixes: - if normalized.startswith(prefix): - return raw[len(prefix):].lstrip(" ,.:;-") - return raw + return self.policy.remove_order_selection_reset_prefix(message) def _is_order_selection_reset_message(self, message: str) -> bool: - normalized = self._normalize_text(message).strip() - reset_terms = { - "esqueca tudo", - "esqueca tudo agora", - "esquece tudo", - "esquece tudo agora", - "ignora isso", - "ignore isso", - "deixa isso", - "deixa pra la", - "deixa para la", - "novo assunto", - "muda de assunto", - "vamos comecar de novo", - "comecar de novo", - "reiniciar", - "resetar conversa", - } - return self._contains_any_term(normalized, reset_terms) + return self.policy.is_order_selection_reset_message(message) def _looks_like_fresh_operational_request(self, message: str) -> bool: - normalized = self._normalize_text(message).strip() - if len(normalized) < 15: - return False - operational_terms = { - "agendar", - "revisao", - "cancelar", - "pedido", - "comprar", - "compra", - "carro", - "veiculo", - "remarcar", - "tambem", - } - return self._contains_any_term(normalized, operational_terms) + return self.policy.looks_like_fresh_operational_request(message) def _detect_selected_order_index(self, message: str, orders: list[dict]) -> tuple[int | None, bool]: - normalized = self._strip_choice_message(self._normalize_text(message)) - - indifferent_tokens = { - "tanto faz", - "indiferente", - "qualquer um", - "qualquer uma", - "voce escolhe", - "pode escolher", - "fica a seu criterio", - } - if normalized in indifferent_tokens: - return 0, True - - first_tokens = {"1", "primeiro", "primeira", "opcao 1", "acao 1", "pedido 1"} - second_tokens = {"2", "segundo", "segunda", "opcao 2", "acao 2", "pedido 2"} - if normalized in first_tokens: - return 0, False - if normalized in second_tokens: - return 1, False - - review_keywords = { - "revisao", - "agendamento", - "agendar", - "remarcar", - "pos venda", - } - sales_keywords = { - "venda", - "compra", - "comprar", - "pedido", - "cancelamento", - "cancelar", - "carro", - "veiculo", - } - - review_matches = [index for index, order in enumerate(orders) if order.get("domain") == "review"] - sales_matches = [index for index, order in enumerate(orders) if order.get("domain") == "sales"] - has_review_signal = self._contains_any_term(normalized, review_keywords) - has_sales_signal = self._contains_any_term(normalized, sales_keywords) - - if len(review_matches) == 1 and has_review_signal and not has_sales_signal: - return review_matches[0], False - if len(sales_matches) == 1 and has_sales_signal and not has_review_signal: - return sales_matches[0], False - - return None, False + return self.policy.detect_selected_order_index(message=message, orders=orders) async def _try_resolve_pending_order_selection( self, message: str, user_id: int | None, ) -> str | None: - context = self._get_user_context(user_id) - if not context: - return None - - pending = context.get("pending_order_selection") - if not isinstance(pending, dict): - return None - if pending.get("expires_at") and pending["expires_at"] < datetime.utcnow(): - context["pending_order_selection"] = None - return None - - orders = pending.get("orders") or [] - if len(orders) < 2: - context["pending_order_selection"] = None - return None - - if self._is_order_selection_reset_message(message): - self._clear_user_conversation_state(user_id=user_id) - cleaned_message = self._remove_order_selection_reset_prefix(message) - if not cleaned_message: - return "Tudo bem. Limpei o contexto atual. Pode me dizer o que voce quer fazer agora?" - return await self.handle_message(cleaned_message, user_id=user_id) - - selected_index, auto_selected = self._detect_selected_order_index(message=message, orders=orders) - if selected_index is None: - if self._looks_like_fresh_operational_request(message): - context["pending_order_selection"] = None - return None - return self._render_order_selection_prompt(orders) - - selected_order = orders[selected_index] - remaining_order = orders[1 - selected_index] - context["pending_order_selection"] = None - self._queue_order_with_memory_seed( - user_id=user_id, - domain=remaining_order["domain"], - order_message=remaining_order["message"], - memory_seed=remaining_order.get("memory_seed"), - ) - - intro = ( - f"Vou escolher e comecar por: {self._describe_order_selection_option(selected_order)}" - if auto_selected - else f"Perfeito. Vou comecar por: {self._describe_order_selection_option(selected_order)}" - ) - next_response = await self.handle_message(str(selected_order.get("message") or ""), user_id=user_id) - return f"{intro}\n{next_response}" + return await self.policy.try_resolve_pending_order_selection(message=message, user_id=user_id) def _render_queue_notice(self, queued_count: int) -> str | None: - if queued_count <= 0: - return None - if queued_count == 1: - return "Anotei mais 1 pedido e sigo nele quando voce disser 'continuar'." - return f"Anotei mais {queued_count} pedidos e sigo neles conforme voce for dizendo 'continuar'." + return self.policy.render_queue_notice(queued_count) def _render_open_flow_prompt(self, user_id: int | None, domain: str) -> str: - if domain == "review" and user_id is not None: - draft = self.state.get_entry("pending_review_drafts", user_id, expire=True) - if draft: - missing = [field for field in REVIEW_REQUIRED_FIELDS if field not in draft.get("payload", {})] - if missing: - return self._render_missing_review_fields_prompt(missing) - - management_draft = self.state.get_entry("pending_review_management_drafts", user_id, expire=True) - if management_draft: - action = management_draft.get("action", "cancel") - payload = management_draft.get("payload", {}) - if action == "reschedule": - missing = [field for field in ("protocolo", "nova_data_hora") if field not in payload] - if missing: - return self._render_missing_review_reschedule_fields_prompt(missing) - else: - missing = [field for field in ("protocolo",) if field not in payload] - if missing: - return self._render_missing_review_cancel_fields_prompt(missing) - - pending = self.state.get_entry("pending_review_confirmations", user_id, expire=True) - if pending: - return "Antes de mudar de assunto, me confirme se podemos concluir seu agendamento de revisao." - reuse_pending = self.state.get_entry("pending_review_reuse_confirmations", user_id, expire=True) - if reuse_pending: - return self._render_review_reuse_question() - if domain == "sales" and user_id is not None: - draft = self.state.get_entry("pending_order_drafts", user_id, expire=True) - if draft: - missing = [field for field in ORDER_REQUIRED_FIELDS if field not in draft.get("payload", {})] - if missing: - return self._render_missing_order_fields_prompt(missing) - cancel_draft = self.state.get_entry("pending_cancel_order_drafts", user_id, expire=True) - if cancel_draft: - missing = [field for field in CANCEL_ORDER_REQUIRED_FIELDS if field not in cancel_draft.get("payload", {})] - if missing: - return self._render_missing_cancel_order_fields_prompt(missing) - - return "Vamos concluir este assunto primeiro e ja sigo com o proximo em seguida." + return self.policy.render_open_flow_prompt(user_id=user_id, domain=domain) def _build_next_order_transition(self, domain: str) -> str: - if domain == "sales": - return "Agora, sobre a compra do veiculo:" - if domain == "review": - return "Agora, sobre o agendamento da revisao:" - return "Agora, sobre o proximo assunto:" + return self.policy.build_next_order_transition(domain) async def _maybe_auto_advance_next_order(self, base_response: str, user_id: int | None) -> str: - context = self._get_user_context(user_id) - if not context: - return base_response - - if context.get("pending_switch"): - return base_response - - active_domain = context.get("active_domain", "general") - if self._has_open_flow(user_id=user_id, domain=active_domain): - return base_response - - next_order = self._pop_next_order(user_id=user_id) - if not next_order: - return base_response - - context["pending_switch"] = { - "source_domain": context.get("active_domain", "general"), - "target_domain": next_order["domain"], - "queued_message": next_order["message"], - "memory_seed": dict(next_order.get("memory_seed") or self._new_tab_memory(user_id=user_id)), - "expires_at": datetime.utcnow() + timedelta(minutes=15), - } - transition = self._build_next_order_transition(next_order["domain"]) - return ( - f"{base_response}\n\n" - f"{transition}\n" - "Tenho um proximo pedido na fila. Quando quiser, diga 'continuar' para eu seguir nele." - ) + return await self.policy.maybe_auto_advance_next_order(base_response=base_response, user_id=user_id) def _domain_from_intents(self, intents: dict | None) -> str: - normalized = self._normalize_intents(intents) - review_score = ( - int(normalized.get("review_schedule", False)) - + int(normalized.get("review_list", False)) - + int(normalized.get("review_cancel", False)) - + int(normalized.get("review_reschedule", False)) - ) - sales_score = int(normalized.get("order_create", False)) + int(normalized.get("order_cancel", False)) - if review_score > sales_score and review_score > 0: - return "review" - if sales_score > review_score and sales_score > 0: - return "sales" - return "general" + return self.policy.domain_from_intents(intents) def _is_context_switch_confirmation(self, message: str) -> bool: - return self._is_affirmative_message(message) or self._is_negative_message(message) + return self.policy.is_context_switch_confirmation(message) def _is_continue_queue_message(self, message: str) -> bool: - normalized = self._normalize_text(message).strip() - normalized = re.sub(r"[.!?,;:]+$", "", normalized) - return normalized in {"continuar", "pode continuar", "seguir", "pode seguir", "proximo", "segue"} + return self.policy.is_continue_queue_message(message) async def _try_continue_queued_order(self, message: str, user_id: int | None) -> str | None: - context = self._get_user_context(user_id) - if not context: - return None - - pending_switch = context.get("pending_switch") - if not isinstance(pending_switch, dict): - return None - if pending_switch.get("expires_at") and pending_switch["expires_at"] < datetime.utcnow(): - context["pending_switch"] = None - return None - queued_message = str(pending_switch.get("queued_message") or "").strip() - if not queued_message: - return None - - if self._is_negative_message(message): - context["pending_switch"] = None - return "Tudo bem. Mantive o proximo pedido fora da fila por enquanto." - if not (self._is_continue_queue_message(message) or self._is_affirmative_message(message)): - return None - - target_domain = str(pending_switch.get("target_domain") or "general") - memory_seed = dict(pending_switch.get("memory_seed") or {}) - self._apply_domain_switch(user_id=user_id, target_domain=target_domain) - refreshed = self._get_user_context(user_id) - if refreshed is not None: - refreshed["generic_memory"] = memory_seed - - transition = self._build_next_order_transition(target_domain) - next_response = await self.handle_message(queued_message, user_id=user_id) - return f"{transition}\n{next_response}" + return await self.policy.try_continue_queued_order(message=message, user_id=user_id) def _has_open_flow(self, user_id: int | None, domain: str) -> bool: - if user_id is None: - return False - if domain == "review": - return bool( - self.state.get_entry("pending_review_drafts", user_id, expire=True) - or self.state.get_entry("pending_review_confirmations", user_id, expire=True) - or self.state.get_entry("pending_review_management_drafts", user_id, expire=True) - or self.state.get_entry("pending_review_reuse_confirmations", user_id, expire=True) - ) - if domain == "sales": - return bool( - self.state.get_entry("pending_order_drafts", user_id, expire=True) - or self.state.get_entry("pending_cancel_order_drafts", user_id, expire=True) - ) - return False + return self.policy.has_open_flow(user_id=user_id, domain=domain) def _apply_domain_switch(self, user_id: int | None, target_domain: str) -> None: - context = self._get_user_context(user_id) - if not context: - return - previous_domain = context.get("active_domain", "general") - if previous_domain == "review": - self._reset_pending_review_states(user_id=user_id) - if previous_domain == "sales": - self._reset_pending_order_states(user_id=user_id) - context["active_domain"] = target_domain - context["generic_memory"] = self._new_tab_memory(user_id=user_id) - context["pending_order_selection"] = None - context["pending_switch"] = None + self.policy.apply_domain_switch(user_id=user_id, target_domain=target_domain) def _handle_context_switch( self, @@ -1598,109 +717,45 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin): user_id: int | None, target_domain_hint: str = "general", ) -> str | None: - context = self._get_user_context(user_id) - if not context: - return None - - pending_switch = context.get("pending_switch") - if pending_switch: - if pending_switch["expires_at"] < datetime.utcnow(): - context["pending_switch"] = None - elif self._is_context_switch_confirmation(message): - if self._is_affirmative_message(message): - target_domain = pending_switch["target_domain"] - self._apply_domain_switch(user_id=user_id, target_domain=target_domain) - return self._render_context_switched_message(target_domain=target_domain) - context["pending_switch"] = None - return "Perfeito, vamos continuar no fluxo atual." - - pending_order_selection = context.get("pending_order_selection") - if pending_order_selection and pending_order_selection.get("expires_at") < datetime.utcnow(): - context["pending_order_selection"] = None - - current_domain = context.get("active_domain", "general") - target_domain = target_domain_hint - if target_domain == "general" or target_domain == current_domain: - return None - if not self._has_open_flow(user_id=user_id, domain=current_domain): - return None - - context["pending_switch"] = { - "source_domain": current_domain, - "target_domain": target_domain, - "expires_at": datetime.utcnow() + timedelta(minutes=15), - } - return self._render_context_switch_confirmation( - source_domain=current_domain, - target_domain=target_domain, - ) + return self.policy.handle_context_switch(message=message, user_id=user_id, target_domain_hint=target_domain_hint) def _update_active_domain(self, user_id: int | None, domain_hint: str = "general") -> None: - context = self._get_user_context(user_id) - if not context: - return - detected = domain_hint - if detected != "general": - context["active_domain"] = detected + self.policy.update_active_domain(user_id=user_id, domain_hint=domain_hint) def _domain_label(self, domain: str) -> str: - labels = { - "review": "agendamento de revisao", - "sales": "compra de veiculo", - "general": "atendimento geral", - } - return labels.get(domain, "atendimento") + return self.policy.domain_label(domain) def _render_context_switch_confirmation(self, source_domain: str, target_domain: str) -> str: - return ( - f"Entendi que voce quer sair de {self._domain_label(source_domain)} " - f"e ir para {self._domain_label(target_domain)}. Tem certeza?" - ) + return self.policy.render_context_switch_confirmation(source_domain=source_domain, target_domain=target_domain) def _render_context_switched_message(self, target_domain: str) -> str: - return f"Certo, contexto anterior encerrado. Vamos seguir com {self._domain_label(target_domain)}." + return self.policy.render_context_switched_message(target_domain=target_domain) def _build_context_summary(self, user_id: int | None) -> str: - context = self._get_user_context(user_id) - if not context: - return "Contexto de conversa: sem contexto ativo." - - domain = context.get("active_domain", "general") - memory = context.get("generic_memory", {}) - order_queue = context.get("order_queue", []) - summary = [f"Contexto de conversa ativo: {self._domain_label(domain)}."] - if memory: - summary.append(f"Memoria generica temporaria: {memory}.") - if order_queue: - summary.append(f"Fila de pedidos pendentes: {len(order_queue)}.") - return " ".join(summary) + return self.policy.build_context_summary(user_id=user_id) def _should_use_deterministic_response(self, tool_name: str) -> bool: - return tool_name in DETERMINISTIC_RESPONSE_TOOLS + return self.tool_executor.should_use_deterministic_response(tool_name) def _normalize_text(self, text: str) -> str: - normalized = unicodedata.normalize("NFKD", text or "") - ascii_text = normalized.encode("ascii", "ignore").decode("ascii") - return ascii_text.lower() + return self.normalizer.normalize_text(text) def _is_low_value_response(self, text: str) -> bool: return text.strip().lower() in LOW_VALUE_RESPONSES def _is_affirmative_message(self, text: str) -> bool: - normalized = self._normalize_text(text).strip() - normalized = re.sub(r"[.!?,;:]+$", "", normalized) + normalized = self._normalize_text(text).strip().rstrip(".!?,;:") return normalized in {"sim", "pode", "ok", "confirmo", "aceito", "fechado", "pode sim", "tenho", "tenho sim"} def _is_negative_message(self, text: str) -> bool: - normalized = self._normalize_text(text).strip() - normalized = re.sub(r"[.!?,;:]+$", "", normalized) + normalized = self._normalize_text(text).strip().rstrip(".!?,;:") return ( normalized in {"nao", "nao quero", "prefiro outro", "outro horario"} or normalized.startswith("nao") ) def _extract_time_only(self, text: str) -> str | None: - return self._extract_hhmm_from_text(text) + return self.normalizer.extract_hhmm_from_text(text) def _merge_date_with_time(self, base_iso: str, new_time_hhmm: str) -> str | None: try: @@ -1763,7 +818,7 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin): payload = dict(pending["payload"]) payload["data_hora"] = new_data_hora try: - tool_result = await self.registry.execute( + tool_result = await self.tool_executor.execute( "agendar_revisao", payload, user_id=user_id, @@ -1785,7 +840,7 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin): if not self._is_affirmative_message(message): return None try: - tool_result = await self.registry.execute( + tool_result = await self.tool_executor.execute( "agendar_revisao", pending["payload"], user_id=user_id, @@ -1831,14 +886,7 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin): ) def _http_exception_detail(self, exc: HTTPException) -> str: - detail = exc.detail - if isinstance(detail, str): - return detail - if isinstance(detail, dict): - message = str(detail.get("message") or "").strip() - if message: - return message - return "Nao foi possivel concluir a operacao solicitada." + return self.tool_executor.http_exception_detail(exc) def _fallback_format_tool_result(self, tool_name: str, tool_result) -> str: - return fallback_format_tool_result(tool_name=tool_name, tool_result=tool_result) + return self.tool_executor.fallback_format_tool_result(tool_name=tool_name, tool_result=tool_result) diff --git a/app/services/orchestration/tool_executor.py b/app/services/orchestration/tool_executor.py new file mode 100644 index 0000000..5b8ad82 --- /dev/null +++ b/app/services/orchestration/tool_executor.py @@ -0,0 +1,28 @@ +from fastapi import HTTPException + +from app.services.orchestration.orchestrator_config import DETERMINISTIC_RESPONSE_TOOLS +from app.services.orchestration.response_formatter import fallback_format_tool_result + + +class ToolExecutor: + def __init__(self, registry): + self.registry = registry + + async def execute(self, tool_name: str, arguments: dict, user_id: int | None = None): + return await self.registry.execute(tool_name, arguments, user_id=user_id) + + def should_use_deterministic_response(self, tool_name: str) -> bool: + return tool_name in DETERMINISTIC_RESPONSE_TOOLS + + def http_exception_detail(self, exc: HTTPException) -> str: + detail = exc.detail + if isinstance(detail, str): + return detail + if isinstance(detail, dict): + message = str(detail.get("message") or "").strip() + if message: + return message + return "Nao foi possivel concluir a operacao solicitada." + + def fallback_format_tool_result(self, tool_name: str, tool_result) -> str: + return fallback_format_tool_result(tool_name=tool_name, tool_result=tool_result)