From 8cf79174ee917fe7efdcd84a7b373e38d50e1059 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vitor=20Hugo=20Belorio=20Sim=C3=A3o?= Date: Tue, 10 Mar 2026 18:04:11 -0300 Subject: [PATCH] =?UTF-8?q?=F0=9F=A7=A0=20feat(orquestrador):=20deixar=20o?= =?UTF-8?q?=20modelo=20decidir=20o=20turno=20e=20limitar=20regex=20a=20for?= =?UTF-8?q?malizacao=20tecnica?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Introduz o contrato TurnDecision e a extracao estruturada por turno no planner para que intent, domain, action, selecao e resposta venham do modelo, com validacao Pydantic e fallback previsivel quando o JSON vier invalido. Tambem extrai a normalizacao tecnica para um modulo dedicado e passa a usar regex apenas para formalizar CPF, placa, protocolos, datas e outros formatos estruturados, reduzindo heuristicas semanticas dentro do normalizador, da policy e dos fluxos de revisao. --- app/services/flows/review_flow.py | 26 +- .../orchestration/conversation_policy.py | 91 +++- .../orchestration/entity_normalizer.py | 222 ++------- app/services/orchestration/message_planner.py | 44 ++ .../orchestration/orquestrador_service.py | 338 ++++++++++++- .../orchestration/technical_normalizer.py | 245 ++++++++++ app/services/orchestration/turn_decision.py | 72 +++ tests/test_turn_decision_contract.py | 452 ++++++++++++++++++ 8 files changed, 1282 insertions(+), 208 deletions(-) create mode 100644 app/services/orchestration/technical_normalizer.py create mode 100644 app/services/orchestration/turn_decision.py create mode 100644 tests/test_turn_decision_contract.py diff --git a/app/services/flows/review_flow.py b/app/services/flows/review_flow.py index 42ae0e4..41248c4 100644 --- a/app/services/flows/review_flow.py +++ b/app/services/flows/review_flow.py @@ -10,22 +10,28 @@ from app.services.orchestration.orchestrator_config import ( ) +# Esse mixin concentra os fluxos incrementais de revisao e pos-venda. class ReviewFlowMixin: + def _decision_intent(self, turn_decision: dict | None) -> str: + return str((turn_decision or {}).get("intent") or "").strip().lower() + async def _try_handle_review_management( self, message: str, user_id: int | None, extracted_fields: dict | None = None, intents: dict | None = None, + turn_decision: dict | None = None, ) -> str | None: if user_id is None: return None normalized_intents = self._normalize_intents(intents) draft = self.state.get_entry("pending_review_management_drafts", user_id, expire=True) + decision_intent = self._decision_intent(turn_decision) - has_list_intent = normalized_intents.get("review_list", False) - has_cancel_intent = normalized_intents.get("review_cancel", False) - has_reschedule_intent = normalized_intents.get("review_reschedule", False) + has_list_intent = decision_intent == "review_list" or normalized_intents.get("review_list", False) + has_cancel_intent = decision_intent == "review_cancel" or normalized_intents.get("review_cancel", False) + has_reschedule_intent = decision_intent == "review_reschedule" or normalized_intents.get("review_reschedule", False) if has_list_intent: self._reset_pending_review_states(user_id=user_id) @@ -147,6 +153,8 @@ class ReviewFlowMixin: def _store_last_review_package(self, user_id: int | None, payload: dict | None) -> None: if user_id is None or not isinstance(payload, dict): return + # Guarda um pacote reutilizavel do ultimo veiculo informado + # para reduzir repeticao em novos agendamentos. package = { "placa": payload.get("placa"), "modelo": payload.get("modelo"), @@ -182,14 +190,17 @@ class ReviewFlowMixin: user_id: int | None, extracted_fields: dict | None = None, intents: dict | None = None, + turn_decision: dict | None = None, ) -> str | None: if user_id is None: return None normalized_intents = self._normalize_intents(intents) - has_intent = normalized_intents.get("review_schedule", False) + decision_intent = self._decision_intent(turn_decision) + has_intent = decision_intent == "review_schedule" or normalized_intents.get("review_schedule", False) has_management_intent = ( - normalized_intents.get("review_list", False) + decision_intent in {"review_list", "review_cancel", "review_reschedule"} + or normalized_intents.get("review_list", False) or normalized_intents.get("review_cancel", False) or normalized_intents.get("review_reschedule", False) ) @@ -245,7 +256,8 @@ class ReviewFlowMixin: draft and not has_intent and ( - normalized_intents.get("order_create", False) + decision_intent in {"order_create", "order_cancel"} + or normalized_intents.get("order_create", False) or normalized_intents.get("order_cancel", False) ) and not extracted @@ -257,6 +269,8 @@ class ReviewFlowMixin: return None if draft is None: + # Cria um draft com TTL para permitir coleta do agendamento + # em varias mensagens sem perder o progresso. draft = { "payload": {}, "expires_at": datetime.utcnow() + timedelta(minutes=PENDING_REVIEW_DRAFT_TTL_MINUTES), diff --git a/app/services/orchestration/conversation_policy.py b/app/services/orchestration/conversation_policy.py index 7755a4a..7af354f 100644 --- a/app/services/orchestration/conversation_policy.py +++ b/app/services/orchestration/conversation_policy.py @@ -17,6 +17,19 @@ class ConversationPolicy: def __init__(self, service: "OrquestradorService"): self.service = service + def _decision_action(self, turn_decision: dict | None) -> str: + return str((turn_decision or {}).get("action") or "").strip().lower() + + def _decision_intent(self, turn_decision: dict | None) -> str: + return str((turn_decision or {}).get("intent") or "").strip().lower() + + def _decision_domain(self, turn_decision: dict | None) -> str: + return str((turn_decision or {}).get("domain") or "").strip().lower() + + def _decision_selection_index(self, turn_decision: dict | None) -> int | None: + value = (turn_decision or {}).get("selection_index") + return value if isinstance(value, int) and value >= 0 else None + # Essa função serve para reaproveitar informações já informadas antes, evitando pedir novamente ao usuário. def try_prefill_review_fields_from_memory(self, user_id: int | None, payload: dict) -> None: if user_id is None: @@ -289,7 +302,11 @@ class ConversationPolicy: # Ajuda a perceber quando o usuário talvez tenha mudado de assunto sem responder à pergunta de escolha - def looks_like_fresh_operational_request(self, message: str) -> bool: + def looks_like_fresh_operational_request(self, message: str, turn_decision: dict | None = None) -> bool: + decision_domain = self._decision_domain(turn_decision) + decision_intent = self._decision_intent(turn_decision) + if decision_domain in {"review", "sales"} or decision_intent not in {"", "general"}: + return True normalized = self.service.normalizer.normalize_text(message).strip() if len(normalized) < 15: return False @@ -360,7 +377,16 @@ class ConversationPolicy: # Interpreta a resposta do usuário na etapa de seleção. - def detect_selected_order_index(self, message: str, orders: list[dict]) -> tuple[int | None, bool]: + def detect_selected_order_index( + self, + message: str, + orders: list[dict], + turn_decision: dict | None = None, + ) -> tuple[int | None, bool]: + selection_index = self._decision_selection_index(turn_decision) + if selection_index is not None and 0 <= selection_index < len(orders): + return selection_index, False + normalized = self.strip_choice_message(self.service.normalizer.normalize_text(message)) indifferent_tokens = { "tanto faz", @@ -378,6 +404,12 @@ class ConversationPolicy: if normalized in {"2", "segundo", "segunda", "opcao 2", "acao 2", "pedido 2"}: return 1, False + decision_domain = self._decision_domain(turn_decision) + if len(orders) >= 2 and decision_domain in {"review", "sales"}: + matches = [index for index, order in enumerate(orders) if order.get("domain") == decision_domain] + if len(matches) == 1: + return matches[0], False + review_matches = [index for index, order in enumerate(orders) if order.get("domain") == "review"] sales_matches = [index for index, order in enumerate(orders) if order.get("domain") == "sales"] has_review_signal = self.contains_any_term(normalized, {"revisao", "agendamento", "agendar", "remarcar", "pos venda"}) @@ -391,7 +423,12 @@ class ConversationPolicy: # É a função que efetivamente trata a resposta do usuário quando você perguntou “qual pedido quer fazer primeiro?”. - async def try_resolve_pending_order_selection(self, message: str, user_id: int | None) -> str | None: + async def try_resolve_pending_order_selection( + self, + message: str, + user_id: int | None, + turn_decision: dict | None = None, + ) -> str | None: context = self.service._get_user_context(user_id) if not context: return None @@ -406,16 +443,21 @@ class ConversationPolicy: context["pending_order_selection"] = None return None - if self.is_order_selection_reset_message(message): + decision_action = self._decision_action(turn_decision) + if decision_action == "clear_context" or self.is_order_selection_reset_message(message): self.service._clear_user_conversation_state(user_id=user_id) cleaned_message = self.remove_order_selection_reset_prefix(message) if not cleaned_message: return "Tudo bem. Limpei o contexto atual. Pode me dizer o que voce quer fazer agora?" return await self.service.handle_message(cleaned_message, user_id=user_id) - selected_index, auto_selected = self.detect_selected_order_index(message=message, orders=orders) + selected_index, auto_selected = self.detect_selected_order_index( + message=message, + orders=orders, + turn_decision=turn_decision, + ) if selected_index is None: - if self.looks_like_fresh_operational_request(message): + if self.looks_like_fresh_operational_request(message, turn_decision=turn_decision): context["pending_order_selection"] = None return None return self.render_order_selection_prompt(orders) @@ -548,19 +590,30 @@ class ConversationPolicy: # Detecta comandos de continuação. - def is_context_switch_confirmation(self, message: str) -> bool: + def is_context_switch_confirmation(self, message: str, turn_decision: dict | None = None) -> bool: + if self._decision_action(turn_decision) in {"continue_queue", "cancel_active_flow", "clear_context", "discard_queue"}: + return True + if self._decision_domain(turn_decision) in {"review", "sales"}: + return True return self.service._is_affirmative_message(message) or self.service._is_negative_message(message) # Executa o próximo pedido da fila quando o usuário disser “continuar”. - def is_continue_queue_message(self, message: str) -> bool: + def is_continue_queue_message(self, message: str, turn_decision: dict | None = None) -> bool: + if self._decision_action(turn_decision) == "continue_queue" or self._decision_intent(turn_decision) == "queue_continue": + return True normalized = self.service.normalizer.normalize_text(message).strip() normalized = re.sub(r"[.!?,;:]+$", "", normalized) return normalized in {"continuar", "pode continuar", "seguir", "pode seguir", "proximo", "segue"} # Executa o próximo pedido da fila quando o usuário disser “continuar”. - async def try_continue_queued_order(self, message: str, user_id: int | None) -> str | None: + async def try_continue_queued_order( + self, + message: str, + user_id: int | None, + turn_decision: dict | None = None, + ) -> str | None: context = self.service._get_user_context(user_id) if not context: return None @@ -574,10 +627,14 @@ class ConversationPolicy: if not queued_message: return None - if self.service._is_negative_message(message): + decision_action = self._decision_action(turn_decision) + if self.service._is_negative_message(message) and decision_action != "continue_queue": context["pending_switch"] = None return "Tudo bem. Mantive o proximo pedido fora da fila por enquanto." - if not (self.is_continue_queue_message(message) or self.service._is_affirmative_message(message)): + if not ( + self.is_continue_queue_message(message, turn_decision=turn_decision) + or self.service._is_affirmative_message(message) + ): return None target_domain = str(pending_switch.get("target_domain") or "general") @@ -627,7 +684,13 @@ class ConversationPolicy: # Controla a confirmação de “você quer mesmo sair deste assunto e ir para outro?”. - def handle_context_switch(self, message: str, user_id: int | None, target_domain_hint: str = "general") -> str | None: + def handle_context_switch( + self, + message: str, + user_id: int | None, + target_domain_hint: str = "general", + turn_decision: dict | None = None, + ) -> str | None: context = self.service._get_user_context(user_id) if not context: return None @@ -635,8 +698,8 @@ class ConversationPolicy: if pending_switch: if pending_switch["expires_at"] < datetime.utcnow(): context["pending_switch"] = None - elif self.is_context_switch_confirmation(message): - if self.service._is_affirmative_message(message): + elif self.is_context_switch_confirmation(message, turn_decision=turn_decision): + if self.service._is_affirmative_message(message) or self._decision_domain(turn_decision) == pending_switch["target_domain"]: target_domain = pending_switch["target_domain"] self.apply_domain_switch(user_id=user_id, target_domain=target_domain) return self.render_context_switched_message(target_domain=target_domain) diff --git a/app/services/orchestration/entity_normalizer.py b/app/services/orchestration/entity_normalizer.py index f6821a9..3f0d6e0 100644 --- a/app/services/orchestration/entity_normalizer.py +++ b/app/services/orchestration/entity_normalizer.py @@ -1,14 +1,22 @@ import json import logging import re -import unicodedata -from datetime import datetime, timedelta +from datetime import datetime +from pydantic import ValidationError + +from app.services.orchestration import technical_normalizer +from app.services.orchestration.turn_decision import TurnDecision logger = logging.getLogger(__name__) +# Essa classe concentra normalizacao tecnica e coercoes estruturadas. +# A semantica conversacional idealmente vem do modelo, nao daqui. class EntityNormalizer: + def empty_turn_decision(self) -> dict: + return TurnDecision().model_dump() + def empty_extraction_payload(self) -> dict: return { "generic_memory": {}, @@ -92,163 +100,66 @@ class EntityNormalizer: logger.warning("Extracao com JSON invalido apos recorte.") return None + def coerce_turn_decision(self, payload) -> dict: + if not isinstance(payload, dict): + return self.empty_turn_decision() + try: + model = TurnDecision.model_validate(payload) + except ValidationError: + logger.warning("Decisao de turno invalida; usando fallback estruturado.") + return self.empty_turn_decision() + + normalized_entities = { + "generic_memory": self.normalize_generic_fields(model.entities.generic_memory), + "review_fields": self.normalize_review_fields(model.entities.review_fields), + "review_management_fields": self.normalize_review_management_fields(model.entities.review_management_fields), + "order_fields": self.normalize_order_fields(model.entities.order_fields), + "cancel_order_fields": self.normalize_cancel_order_fields(model.entities.cancel_order_fields), + } + dumped = model.model_dump() + dumped["entities"] = normalized_entities + dumped["tool_arguments"] = dumped.get("tool_arguments") or {} + dumped["missing_fields"] = [str(field) for field in dumped.get("missing_fields") or [] if str(field).strip()] + return dumped + def normalize_text(self, text: str) -> str: - normalized = unicodedata.normalize("NFKD", text or "") - ascii_text = normalized.encode("ascii", "ignore").decode("ascii") - return ascii_text.lower() + return technical_normalizer.normalize_text(text) def normalize_plate(self, value) -> str | None: - text = str(value or "").strip().upper() - if not text: - return None - if re.fullmatch(r"[A-Z]{3}[0-9][A-Z0-9][0-9]{2}", text) or re.fullmatch(r"[A-Z]{3}[0-9]{4}", text): - return text - compact = re.sub(r"[^A-Z0-9]", "", text) - if re.fullmatch(r"[A-Z]{3}[0-9][A-Z0-9][0-9]{2}", compact) or re.fullmatch(r"[A-Z]{3}[0-9]{4}", compact): - return compact - return None + return technical_normalizer.normalize_plate(value) def normalize_cpf(self, value) -> str | None: - digits = re.sub(r"\D", "", str(value or "")) - if len(digits) == 11: - return digits - return None + return technical_normalizer.normalize_cpf(value) def normalize_positive_number(self, value) -> float | None: - if value is None: - return None - if isinstance(value, (int, float)): - number = float(value) - return number if number > 0 else None - text = self.normalize_text(str(value)) - text = text.replace("r$", "").strip() - multiplier = 1000 if "mil" in text else 1 - text = text.replace("mil", "").strip() - digits = re.sub(r"[^0-9,.\s]", "", text) - if not digits: - return None - numeric = digits.replace(".", "").replace(" ", "").replace(",", ".") - try: - number = float(numeric) * multiplier - return number if number > 0 else None - except ValueError: - return None + return technical_normalizer.normalize_positive_number(value) def normalize_vehicle_profile(self, value) -> list[str]: - if value is None: - return [] - allowed = {"suv", "sedan", "hatch", "pickup"} - items = value if isinstance(value, list) else [value] - normalized: list[str] = [] - for item in items: - marker = self.normalize_text(str(item)).strip() - if marker in allowed and marker not in normalized: - normalized.append(marker) - return normalized + return technical_normalizer.normalize_vehicle_profile(value) def normalize_bool(self, value) -> bool | None: - if isinstance(value, bool): - return value - lowered = self.normalize_text(str(value or "")).strip() - if lowered in {"sim", "true", "1", "yes"}: - return True - if lowered in {"nao", "false", "0", "no"}: - return False - return None + return technical_normalizer.normalize_bool(value) def normalize_datetime_connector(self, text: str) -> str: - compact = " ".join(str(text or "").strip().split()) - return re.sub(r"\s+[aàáâã]s\s+", " ", compact, flags=re.IGNORECASE).strip() + return technical_normalizer.normalize_datetime_connector(text) def try_parse_iso_datetime(self, text: str) -> datetime | None: - candidate = str(text or "").strip() - if not candidate: - return None - try: - return datetime.fromisoformat(candidate.replace("Z", "+00:00")) - except ValueError: - return None + return technical_normalizer.try_parse_iso_datetime(text) def try_parse_datetime_with_formats(self, text: str, formats: tuple[str, ...]) -> datetime | None: - candidate = str(text or "").strip() - if not candidate: - return None - for fmt in formats: - try: - return datetime.strptime(candidate, fmt) - except ValueError: - continue - return None + return technical_normalizer.try_parse_datetime_with_formats(text, formats) def try_parse_review_absolute_datetime(self, text: str) -> datetime | None: - normalized = self.normalize_datetime_connector(text) - parsed = self.try_parse_iso_datetime(normalized) - if parsed is not None: - return parsed - - day_first_formats = ( - "%d/%m/%Y %H:%M", - "%d/%m/%Y %H:%M:%S", - "%d-%m-%Y %H:%M", - "%d-%m-%Y %H:%M:%S", - ) - year_first_formats = ( - "%Y/%m/%d %H:%M", - "%Y/%m/%d %H:%M:%S", - "%Y-%m-%d %H:%M", - "%Y-%m-%d %H:%M:%S", - ) - return self.try_parse_datetime_with_formats(normalized, day_first_formats + year_first_formats) + return technical_normalizer.try_parse_review_absolute_datetime(text) def strip_token_edges(self, token: str) -> str: - cleaned = str(token or "").strip() - edge_chars = "[](){}<>,.;:!?\"'`" - while cleaned and cleaned[0] in edge_chars: - cleaned = cleaned[1:] - while cleaned and cleaned[-1] in edge_chars: - cleaned = cleaned[:-1] - return cleaned + return technical_normalizer.strip_token_edges(token) def extract_hhmm_from_text(self, text: str) -> str | None: - cleaned = self.normalize_datetime_connector(text) - for token in cleaned.split(): - normalized_token = self.strip_token_edges(token) - parts = normalized_token.split(":") - if len(parts) not in {2, 3}: - continue - if not all(part.isdigit() for part in parts): - continue - hour = int(parts[0]) - minute = int(parts[1]) - if 0 <= hour <= 23 and 0 <= minute <= 59: - return f"{hour:02d}:{minute:02d}" - return None + return technical_normalizer.extract_hhmm_from_text(text) def normalize_review_datetime_text(self, value) -> str | None: - text = str(value or "").strip() - if not text: - return None - - absolute_dt = self.try_parse_review_absolute_datetime(text) - if absolute_dt is not None: - return text - - normalized = self.normalize_text(text) - day_offset = None - if "amanha" in normalized: - day_offset = 1 - elif "hoje" in normalized: - day_offset = 0 - if day_offset is None: - return text - - time_text = self.extract_hhmm_from_text(normalized) - if not time_text: - return text - - hour_text, minute_text = time_text.split(":") - target_date = datetime.now() + timedelta(days=day_offset) - return f"{target_date.strftime('%d/%m/%Y')} {int(hour_text):02d}:{int(minute_text):02d}" + return technical_normalizer.normalize_review_datetime_text(value) def normalize_generic_fields(self, data) -> dict: if not isinstance(data, dict): @@ -295,48 +206,19 @@ class EntityNormalizer: return extracted def tokenize_text(self, text: str) -> list[str]: - return [token for token in str(text or "").split() if token] + return technical_normalizer.tokenize_text(text) def clean_protocol_token(self, token: str) -> str: - cleaned = str(token or "").strip().upper() - edge_chars = "[](){}<>,.;:!?\"'`" - while cleaned and cleaned[0] in edge_chars: - cleaned = cleaned[1:] - while cleaned and cleaned[-1] in edge_chars: - cleaned = cleaned[:-1] - return cleaned + return technical_normalizer.clean_protocol_token(token) def is_valid_protocol_suffix(self, value: str) -> bool: - if not value or len(value) < 4: - return False - return all(char.isalnum() for char in value) + return technical_normalizer.is_valid_protocol_suffix(value) def normalize_review_protocol(self, value: str) -> str | None: - candidate = self.clean_protocol_token(value) - if not candidate.startswith("REV-"): - return None - parts = candidate.split("-") - if len(parts) != 3: - return None - prefix, date_part, suffix_part = parts - if prefix != "REV": - return None - if len(date_part) != 8 or not date_part.isdigit(): - return None - try: - datetime.strptime(date_part, "%Y%m%d") - except ValueError: - return None - if not self.is_valid_protocol_suffix(suffix_part): - return None - return f"{prefix}-{date_part}-{suffix_part}" + return technical_normalizer.normalize_review_protocol(value) def extract_review_protocol_from_text(self, text: str) -> str | None: - for token in self.tokenize_text(text): - normalized = self.normalize_review_protocol(token) - if normalized: - return normalized - return self.normalize_review_protocol(str(text or "")) + return technical_normalizer.extract_review_protocol_from_text(text) def normalize_review_management_fields(self, data) -> dict: if not isinstance(data, dict): @@ -373,8 +255,8 @@ class EntityNormalizer: if not isinstance(data, dict): return {} extracted: dict = {} - order_number = str(data.get("numero_pedido") or "").strip().upper() - if order_number and re.fullmatch(r"PED-[A-Z0-9\\-]+", order_number): + order_number = technical_normalizer.normalize_order_number(data.get("numero_pedido")) + if order_number: extracted["numero_pedido"] = order_number reason = str(data.get("motivo") or "").strip(" .;") if reason: diff --git a/app/services/orchestration/message_planner.py b/app/services/orchestration/message_planner.py index 83b7895..6dc93f5 100644 --- a/app/services/orchestration/message_planner.py +++ b/app/services/orchestration/message_planner.py @@ -1,12 +1,16 @@ import logging +import json from app.services.ai.llm_service import LLMService from app.services.orchestration.entity_normalizer import EntityNormalizer +from app.services.orchestration.turn_decision import TurnDecision logger = logging.getLogger(__name__) +# Esse componente pede ao modelo contratos estruturados +# para roteamento, extracao tecnica e decisao por turno. class MessagePlanner: def __init__(self, llm: LLMService, normalizer: EntityNormalizer): self.llm = llm @@ -130,6 +134,46 @@ class MessagePlanner: logger.exception("Falha ao extrair entidades com LLM. user_id=%s", user_id) return default + async def extract_turn_decision(self, message: str, user_id: int | None) -> dict: + user_context = f"user_id={user_id}" if user_id is not None else "user_id=anonimo" + default = self.normalizer.empty_turn_decision() + schema_example = json.dumps(TurnDecision().model_dump(), ensure_ascii=True) + prompt = ( + "Analise a mensagem do usuario e retorne APENAS JSON valido seguindo o contrato de decisao por turno.\n" + "Nao use markdown. Nao escreva texto fora do JSON. Nao invente dados ausentes.\n" + "Use regex apenas para formatos tecnicos; a decisao semantica deve vir do modelo.\n\n" + "Contrato obrigatorio:\n" + f"{schema_example}\n\n" + "Regras:\n" + "- 'domain' deve ser review, sales ou general.\n" + "- 'intent' deve refletir a intencao principal do turno.\n" + "- 'action' deve ser uma das acoes do contrato.\n" + "- 'entities' deve manter as secoes generic_memory, review_fields, review_management_fields, order_fields e cancel_order_fields.\n" + "- Se faltar dado para continuar um fluxo, use action='ask_missing_fields' e preencha 'missing_fields' e 'response_to_user'.\n" + "- Se o usuario estiver escolhendo entre pedidos enfileirados (ex.: '1', '2', 'o segundo'), preencha 'selection_index' com base zero.\n" + "- Se for necessaria uma tool de orquestracao, use action compativel e preencha 'tool_name' e 'tool_arguments' quando apropriado.\n" + "- Se nao houver acao operacional, use action='answer_user'.\n\n" + f"Contexto: {user_context}\n" + f"Mensagem do usuario: {message}" + ) + + # Faz um retry curto quando o modelo devolve JSON invalido, + # evitando loops longos e mantendo fallback previsivel. + for attempt in range(2): + try: + result = await self.llm.generate_response(message=prompt, tools=[]) + text = (result.get("response") or "").strip() + payload = self.normalizer.parse_json_object(text) + decision = self.normalizer.coerce_turn_decision(payload) + if decision != default: + return decision + if attempt == 0: + logger.warning("Decisao estruturada invalida; repetindo uma vez. user_id=%s", user_id) + except Exception: + logger.exception("Falha ao extrair decisao por turno com LLM. user_id=%s", user_id) + break + return default + def resolve_entities_for_message_plan(self, message_plan: dict, routed_message: str) -> dict: default = self.normalizer.empty_extraction_payload() if not isinstance(message_plan, dict): diff --git a/app/services/orchestration/orquestrador_service.py b/app/services/orchestration/orquestrador_service.py index 5c437b9..90b543a 100644 --- a/app/services/orchestration/orquestrador_service.py +++ b/app/services/orchestration/orquestrador_service.py @@ -29,6 +29,8 @@ from app.services.tools.tool_registry import ToolRegistry logger = logging.getLogger(__name__) +# Coordenador principal do turno conversacional: +# atualiza estado, pede decisoes ao modelo, continua fluxos e executa tools. class OrquestradorService(ReviewFlowMixin, OrderFlowMixin): def __init__( self, @@ -66,13 +68,24 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin): ) self._upsert_user_context(user_id=user_id) + # Faz uma leitura inicial do turno para ajudar a policy + # com fila, troca de contexto e comandos globais. + early_turn_decision = await self._extract_turn_decision_with_llm( + message=message, + user_id=user_id, + ) pending_order_selection = await self._try_resolve_pending_order_selection( message=message, user_id=user_id, + turn_decision=early_turn_decision, ) if pending_order_selection: return pending_order_selection - queued_followup = await self._try_continue_queued_order(message=message, user_id=user_id) + queued_followup = await self._try_continue_queued_order( + message=message, + user_id=user_id, + turn_decision=early_turn_decision, + ) if queued_followup: return queued_followup @@ -106,21 +119,39 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin): message_plan=message_plan, routed_message=routing_message, ) - if not self._has_useful_extraction(extracted_entities): - extracted_entities = await self._extract_entities_with_llm( + # Depois do roteamento para um unico pedido, pede a decisao + # estruturada do turno final que sera executado. + turn_decision = await self._extract_turn_decision_with_llm( + message=routing_message, + user_id=user_id, + ) + if self._has_useful_turn_decision(turn_decision): + extracted_entities = self._merge_extracted_entities( + extracted_entities, + self._extracted_entities_from_turn_decision(turn_decision), + ) + else: + llm_extracted_entities = await self._extract_entities_with_llm( message=routing_message, user_id=user_id, ) + extracted_entities = self._merge_extracted_entities( + extracted_entities, + llm_extracted_entities, + ) self._capture_generic_memory( user_id=user_id, llm_generic_fields=extracted_entities.get("generic_memory", {}), ) - domain_hint = self._domain_from_intents(extracted_entities.get("intents", {})) + domain_hint = self._domain_from_turn_decision(turn_decision) + if domain_hint == "general": + domain_hint = self._domain_from_intents(extracted_entities.get("intents", {})) context_switch_response = self._handle_context_switch( message=routing_message, user_id=user_id, target_domain_hint=domain_hint, + turn_decision=turn_decision, ) if context_switch_response: return await finish(context_switch_response, queue_notice=queue_notice) @@ -130,6 +161,7 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin): orchestration_override = await self._try_execute_orchestration_control_tool( message=routing_message, user_id=user_id, + turn_decision=turn_decision, extracted_entities=extracted_entities, queue_notice=queue_notice, finish=finish, @@ -137,11 +169,29 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin): if orchestration_override: return orchestration_override + decision_action = str(turn_decision.get("action") or "") + decision_response = str(turn_decision.get("response_to_user") or "").strip() + if decision_action == "ask_missing_fields" and decision_response: + return await finish(decision_response, queue_notice=queue_notice) + if decision_action == "answer_user" and decision_response: + return await finish(decision_response, queue_notice=queue_notice) + + planned_tool_response = await self._try_execute_business_tool_from_turn_decision( + message=routing_message, + user_id=user_id, + turn_decision=turn_decision, + queue_notice=queue_notice, + finish=finish, + ) + if planned_tool_response: + return planned_tool_response + review_management_response = await self._try_handle_review_management( message=routing_message, user_id=user_id, extracted_fields=extracted_entities.get("review_management_fields", {}), - intents=extracted_entities.get("intents", {}), + intents={}, + turn_decision=turn_decision, ) if review_management_response: return await finish(review_management_response, queue_notice=queue_notice) @@ -161,7 +211,8 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin): message=routing_message, user_id=user_id, extracted_fields=extracted_entities.get("review_fields", {}), - intents=extracted_entities.get("intents", {}), + intents={}, + turn_decision=turn_decision, ) if review_response: return await finish(review_response, queue_notice=queue_notice) @@ -170,7 +221,8 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin): message=routing_message, user_id=user_id, extracted_fields=extracted_entities.get("cancel_order_fields", {}), - intents=extracted_entities.get("intents", {}), + intents={}, + turn_decision=turn_decision, ) if cancel_order_response: return await finish(cancel_order_response, queue_notice=queue_notice) @@ -179,7 +231,8 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin): message=routing_message, user_id=user_id, extracted_fields=extracted_entities.get("order_fields", {}), - intents=extracted_entities.get("intents", {}), + intents={}, + turn_decision=turn_decision, ) if order_response: return await finish(order_response, queue_notice=queue_notice) @@ -221,6 +274,15 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin): user_id=user_id, ) return await finish(self._http_exception_detail(exc), queue_notice=queue_notice) + + stock_suggestion_response = await self._maybe_build_stock_suggestion_response( + tool_name=tool_name, + arguments=arguments, + tool_result=tool_result, + user_id=user_id, + ) + if stock_suggestion_response: + return await finish(stock_suggestion_response, queue_notice=queue_notice) self._capture_tool_result_context( tool_name=tool_name, tool_result=tool_result, @@ -266,10 +328,42 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin): self, message: str, user_id: int | None, + turn_decision: dict | None, extracted_entities: dict, queue_notice: str | None, finish, ) -> str | None: + decision = turn_decision or {} + decision_action = str(decision.get("action") or "").strip() + decision_tool_name = str(decision.get("tool_name") or "").strip() + decision_tool_arguments = decision.get("tool_arguments") if isinstance(decision.get("tool_arguments"), dict) else {} + + action_to_tool = { + "clear_context": "limpar_contexto_conversa", + "continue_queue": "continuar_proximo_pedido", + "discard_queue": "descartar_pedidos_pendentes", + "cancel_active_flow": "cancelar_fluxo_atual", + } + planned_tool_name = decision_tool_name or action_to_tool.get(decision_action) + if planned_tool_name in ORCHESTRATION_CONTROL_TOOLS: + if ( + planned_tool_name == "cancelar_fluxo_atual" + and self.policy.should_defer_flow_cancellation_control(message=message, user_id=user_id) + ): + return None + try: + tool_result = await self.tool_executor.execute( + planned_tool_name, + decision_tool_arguments or {}, + user_id=user_id, + ) + except HTTPException as exc: + return await finish(self._http_exception_detail(exc), queue_notice=queue_notice) + return await finish( + self._fallback_format_tool_result(planned_tool_name, tool_result), + queue_notice=queue_notice, + ) + tools = self.registry.get_tools() llm_result = await self.llm.generate_response( message=self._build_router_prompt(user_message=message, user_id=user_id), @@ -338,6 +432,79 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin): queue_notice=queue_notice, ) + async def _try_execute_business_tool_from_turn_decision( + self, + message: str, + user_id: int | None, + turn_decision: dict | None, + queue_notice: str | None, + finish, + ) -> str | None: + decision = turn_decision or {} + if str(decision.get("action") or "").strip() != "call_tool": + return None + + tool_name = str(decision.get("tool_name") or "").strip() + if not tool_name or tool_name in ORCHESTRATION_CONTROL_TOOLS: + return None + + arguments = decision.get("tool_arguments") if isinstance(decision.get("tool_arguments"), dict) else {} + try: + tool_result = await self.tool_executor.execute( + tool_name, + arguments, + user_id=user_id, + ) + except HTTPException as exc: + self._capture_review_confirmation_suggestion( + tool_name=tool_name, + arguments=arguments, + exc=exc, + user_id=user_id, + ) + return await finish(self._http_exception_detail(exc), queue_notice=queue_notice) + + stock_suggestion_response = await self._maybe_build_stock_suggestion_response( + tool_name=tool_name, + arguments=arguments, + tool_result=tool_result, + user_id=user_id, + ) + if stock_suggestion_response: + return await finish(stock_suggestion_response, queue_notice=queue_notice) + + self._capture_tool_result_context( + tool_name=tool_name, + tool_result=tool_result, + user_id=user_id, + ) + + if self._should_use_deterministic_response(tool_name): + return await finish( + self._fallback_format_tool_result(tool_name, tool_result), + queue_notice=queue_notice, + ) + + final_response = await self.llm.generate_response( + message=self._build_result_prompt( + user_message=message, + user_id=user_id, + tool_name=tool_name, + tool_result=tool_result, + ), + tools=[], + ) + text = (final_response.get("response") or "").strip() + if self._is_low_value_response(text): + return await finish( + self._fallback_format_tool_result(tool_name, tool_result), + queue_notice=queue_notice, + ) + return await finish( + text or self._fallback_format_tool_result(tool_name, tool_result), + queue_notice=queue_notice, + ) + def _reset_pending_review_states(self, user_id: int | None) -> None: if user_id is None: return @@ -553,6 +720,69 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin): if sanitized: context["selected_vehicle"] = None + async def _maybe_build_stock_suggestion_response( + self, + tool_name: str, + arguments: dict | None, + tool_result, + user_id: int | None, + ) -> str | None: + if tool_name != "consultar_estoque" or not isinstance(tool_result, list) or tool_result: + return None + + budget = self._normalize_positive_number((arguments or {}).get("preco_max")) + if not budget: + return None + + relaxed_arguments = dict(arguments or {}) + relaxed_arguments["preco_max"] = max(float(budget) * 1.2, float(budget) + 10000.0) + relaxed_arguments["limite"] = min(max(int((arguments or {}).get("limite") or 5), 1), 5) + relaxed_arguments["ordenar_preco"] = "asc" + + try: + relaxed_result = await self.tool_executor.execute( + "consultar_estoque", + relaxed_arguments, + user_id=user_id, + ) + except HTTPException: + return None + + if not isinstance(relaxed_result, list): + return None + + nearby = [] + for item in relaxed_result: + if not isinstance(item, dict): + continue + try: + price = float(item.get("preco") or 0) + except (TypeError, ValueError): + continue + if price > float(budget): + nearby.append(item) + + if not nearby: + return None + + self._capture_tool_result_context( + tool_name="consultar_estoque", + tool_result=nearby, + user_id=user_id, + ) + + budget_label = f"R$ {float(budget):,.0f}".replace(",", ".") + lines = [f"Nao encontrei veiculos ate {budget_label}."] + lines.append("Mas achei algumas opcoes proximas ao seu orcamento:") + for idx, item in enumerate(nearby[:5], start=1): + modelo = str(item.get("modelo") or "N/A") + categoria = str(item.get("categoria") or "N/A") + codigo = item.get("id", "N/A") + preco = f"R$ {float(item.get('preco') or 0):,.2f}".replace(",", "X").replace(".", ",").replace("X", ".") + lines.append(f"{idx}. [{codigo}] {modelo} ({categoria}) - {preco}") + lines.append("Se quiser, responda com o numero da lista ou com o modelo.") + return "\n".join(lines) + def _new_tab_memory(self, user_id: int | None) -> dict: context = self._get_user_context(user_id) if not context: @@ -583,12 +813,64 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin): async def _extract_entities_with_llm(self, message: str, user_id: int | None) -> dict: return await self.planner.extract_entities(message=message, user_id=user_id) + async def _extract_turn_decision_with_llm(self, message: str, user_id: int | None) -> dict: + return await self.planner.extract_turn_decision(message=message, user_id=user_id) + def _resolve_entities_for_message_plan(self, message_plan: dict, routed_message: str) -> dict: return self.planner.resolve_entities_for_message_plan(message_plan=message_plan, routed_message=routed_message) def _has_useful_extraction(self, extraction: dict | None) -> bool: return self.normalizer.has_useful_extraction(extraction) + def _has_useful_turn_decision(self, turn_decision: dict | None) -> bool: + if not isinstance(turn_decision, dict): + return False + if (turn_decision.get("intent") or "general") != "general": + return True + if (turn_decision.get("action") or "answer_user") != "answer_user": + return True + entities = turn_decision.get("entities") + return self._has_useful_extraction(self._extracted_entities_from_turn_decision(turn_decision)) if isinstance(entities, dict) else False + + def _extracted_entities_from_turn_decision(self, turn_decision: dict | None) -> dict: + entities = (turn_decision or {}).get("entities") + if not isinstance(entities, dict): + entities = {} + return { + "generic_memory": entities.get("generic_memory", {}), + "review_fields": entities.get("review_fields", {}), + "review_management_fields": entities.get("review_management_fields", {}), + "order_fields": entities.get("order_fields", {}), + "cancel_order_fields": entities.get("cancel_order_fields", {}), + "intents": {}, + } + + def _merge_extracted_entities(self, base: dict | None, override: dict | None) -> dict: + merged = self._empty_extraction_payload() + for section in ("generic_memory", "review_fields", "review_management_fields", "order_fields", "cancel_order_fields"): + left = (base or {}).get(section) + right = (override or {}).get(section) + payload = {} + if isinstance(left, dict): + payload.update(left) + if isinstance(right, dict): + payload.update(right) + merged[section] = payload + + base_intents = (base or {}).get("intents") + override_intents = (override or {}).get("intents") + if isinstance(base_intents, dict): + merged["intents"].update(base_intents) + if isinstance(override_intents, dict): + merged["intents"].update(override_intents) + return merged + + def _domain_from_turn_decision(self, turn_decision: dict | None) -> str: + domain = str((turn_decision or {}).get("domain") or "general").strip().lower() + if domain in {"review", "sales", "general"}: + return domain + return "general" + def _parse_json_object(self, text: str): return self.normalizer.parse_json_object(text) @@ -722,8 +1004,8 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin): def _is_order_selection_reset_message(self, message: str) -> bool: return self.policy.is_order_selection_reset_message(message) - def _looks_like_fresh_operational_request(self, message: str) -> bool: - return self.policy.looks_like_fresh_operational_request(message) + def _looks_like_fresh_operational_request(self, message: str, turn_decision: dict | None = None) -> bool: + return self.policy.looks_like_fresh_operational_request(message, turn_decision=turn_decision) def _detect_selected_order_index(self, message: str, orders: list[dict]) -> tuple[int | None, bool]: return self.policy.detect_selected_order_index(message=message, orders=orders) @@ -732,8 +1014,13 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin): self, message: str, user_id: int | None, + turn_decision: dict | None = None, ) -> str | None: - return await self.policy.try_resolve_pending_order_selection(message=message, user_id=user_id) + return await self.policy.try_resolve_pending_order_selection( + message=message, + user_id=user_id, + turn_decision=turn_decision, + ) def _render_queue_notice(self, queued_count: int) -> str | None: return self.policy.render_queue_notice(queued_count) @@ -750,14 +1037,23 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin): def _domain_from_intents(self, intents: dict | None) -> str: return self.policy.domain_from_intents(intents) - def _is_context_switch_confirmation(self, message: str) -> bool: - return self.policy.is_context_switch_confirmation(message) + def _is_context_switch_confirmation(self, message: str, turn_decision: dict | None = None) -> bool: + return self.policy.is_context_switch_confirmation(message, turn_decision=turn_decision) - def _is_continue_queue_message(self, message: str) -> bool: - return self.policy.is_continue_queue_message(message) + def _is_continue_queue_message(self, message: str, turn_decision: dict | None = None) -> bool: + return self.policy.is_continue_queue_message(message, turn_decision=turn_decision) - async def _try_continue_queued_order(self, message: str, user_id: int | None) -> str | None: - return await self.policy.try_continue_queued_order(message=message, user_id=user_id) + async def _try_continue_queued_order( + self, + message: str, + user_id: int | None, + turn_decision: dict | None = None, + ) -> str | None: + return await self.policy.try_continue_queued_order( + message=message, + user_id=user_id, + turn_decision=turn_decision, + ) def _has_open_flow(self, user_id: int | None, domain: str) -> bool: return self.policy.has_open_flow(user_id=user_id, domain=domain) @@ -770,8 +1066,14 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin): message: str, user_id: int | None, target_domain_hint: str = "general", + turn_decision: dict | None = None, ) -> str | None: - return self.policy.handle_context_switch(message=message, user_id=user_id, target_domain_hint=target_domain_hint) + return self.policy.handle_context_switch( + message=message, + user_id=user_id, + target_domain_hint=target_domain_hint, + turn_decision=turn_decision, + ) def _update_active_domain(self, user_id: int | None, domain_hint: str = "general") -> None: self.policy.update_active_domain(user_id=user_id, domain_hint=domain_hint) diff --git a/app/services/orchestration/technical_normalizer.py b/app/services/orchestration/technical_normalizer.py new file mode 100644 index 0000000..2fd6fda --- /dev/null +++ b/app/services/orchestration/technical_normalizer.py @@ -0,0 +1,245 @@ +import re +import unicodedata +from datetime import datetime, timedelta + + +def normalize_text(text: str) -> str: + normalized = unicodedata.normalize("NFKD", text or "") + ascii_text = normalized.encode("ascii", "ignore").decode("ascii") + return ascii_text.lower() + + +def normalize_plate(value) -> str | None: + text = str(value or "").strip().upper() + if not text: + return None + if re.fullmatch(r"[A-Z]{3}[0-9][A-Z0-9][0-9]{2}", text) or re.fullmatch(r"[A-Z]{3}[0-9]{4}", text): + return text + compact = re.sub(r"[^A-Z0-9]", "", text) + if re.fullmatch(r"[A-Z]{3}[0-9][A-Z0-9][0-9]{2}", compact) or re.fullmatch(r"[A-Z]{3}[0-9]{4}", compact): + return compact + return None + + +def normalize_cpf(value) -> str | None: + digits = re.sub(r"\D", "", str(value or "")) + if len(digits) == 11: + return digits + return None + + +def is_valid_cpf(value) -> bool: + digits = normalize_cpf(value) + if not digits: + return False + if digits == digits[0] * 11: + return False + + numbers = [int(digit) for digit in digits] + sum_first = sum(number * weight for number, weight in zip(numbers[:9], range(10, 1, -1))) + first_digit = 11 - (sum_first % 11) + first_digit = 0 if first_digit >= 10 else first_digit + if first_digit != numbers[9]: + return False + + sum_second = sum(number * weight for number, weight in zip(numbers[:10], range(11, 1, -1))) + second_digit = 11 - (sum_second % 11) + second_digit = 0 if second_digit >= 10 else second_digit + return second_digit == numbers[10] + + +def normalize_positive_number(value) -> float | None: + if value is None: + return None + if isinstance(value, (int, float)): + number = float(value) + return number if number > 0 else None + text = normalize_text(str(value)) + text = text.replace("r$", "").strip() + multiplier = 1000 if "mil" in text else 1 + text = text.replace("mil", "").strip() + digits = re.sub(r"[^0-9,.\s]", "", text) + if not digits: + return None + numeric = digits.replace(".", "").replace(" ", "").replace(",", ".") + try: + number = float(numeric) * multiplier + return number if number > 0 else None + except ValueError: + return None + + +def normalize_vehicle_profile(value) -> list[str]: + if value is None: + return [] + allowed = {"suv", "sedan", "hatch", "pickup"} + items = value if isinstance(value, list) else [value] + normalized: list[str] = [] + for item in items: + marker = normalize_text(str(item)).strip() + if marker in allowed and marker not in normalized: + normalized.append(marker) + return normalized + + +def normalize_bool(value) -> bool | None: + if isinstance(value, bool): + return value + lowered = normalize_text(str(value or "")).strip() + if lowered in {"sim", "true", "1", "yes"}: + return True + if lowered in {"nao", "false", "0", "no"}: + return False + return None + + +def normalize_datetime_connector(text: str) -> str: + compact = " ".join(str(text or "").strip().split()) + return re.sub(r"\s+[aàáâã]s\s+", " ", compact, flags=re.IGNORECASE).strip() + + +def try_parse_iso_datetime(text: str) -> datetime | None: + candidate = str(text or "").strip() + if not candidate: + return None + try: + return datetime.fromisoformat(candidate.replace("Z", "+00:00")) + except ValueError: + return None + + +def try_parse_datetime_with_formats(text: str, formats: tuple[str, ...]) -> datetime | None: + candidate = str(text or "").strip() + if not candidate: + return None + for fmt in formats: + try: + return datetime.strptime(candidate, fmt) + except ValueError: + continue + return None + + +def try_parse_review_absolute_datetime(text: str) -> datetime | None: + normalized = normalize_datetime_connector(text) + parsed = try_parse_iso_datetime(normalized) + if parsed is not None: + return parsed + + day_first_formats = ( + "%d/%m/%Y %H:%M", + "%d/%m/%Y %H:%M:%S", + "%d-%m-%Y %H:%M", + "%d-%m-%Y %H:%M:%S", + ) + year_first_formats = ( + "%Y/%m/%d %H:%M", + "%Y/%m/%d %H:%M:%S", + "%Y-%m-%d %H:%M", + "%Y-%m-%d %H:%M:%S", + ) + return try_parse_datetime_with_formats(normalized, day_first_formats + year_first_formats) + + +def strip_token_edges(token: str) -> str: + cleaned = str(token or "").strip() + edge_chars = "[](){}<>,.;:!?\"'`" + while cleaned and cleaned[0] in edge_chars: + cleaned = cleaned[1:] + while cleaned and cleaned[-1] in edge_chars: + cleaned = cleaned[:-1] + return cleaned + + +def extract_hhmm_from_text(text: str) -> str | None: + cleaned = normalize_datetime_connector(text) + for token in cleaned.split(): + normalized_token = strip_token_edges(token) + parts = normalized_token.split(":") + if len(parts) not in {2, 3}: + continue + if not all(part.isdigit() for part in parts): + continue + hour = int(parts[0]) + minute = int(parts[1]) + if 0 <= hour <= 23 and 0 <= minute <= 59: + return f"{hour:02d}:{minute:02d}" + return None + + +def normalize_review_datetime_text(value, now_provider=None) -> str | None: + text = str(value or "").strip() + if not text: + return None + + absolute_dt = try_parse_review_absolute_datetime(text) + if absolute_dt is not None: + return text + + normalized = normalize_text(text) + day_offset = None + if "amanha" in normalized: + day_offset = 1 + elif "hoje" in normalized: + day_offset = 0 + if day_offset is None: + return text + + time_text = extract_hhmm_from_text(normalized) + if not time_text: + return text + + hour_text, minute_text = time_text.split(":") + current_datetime = now_provider() if callable(now_provider) else datetime.now() + target_date = current_datetime + timedelta(days=day_offset) + return f"{target_date.strftime('%d/%m/%Y')} {int(hour_text):02d}:{int(minute_text):02d}" + + +def tokenize_text(text: str) -> list[str]: + return [token for token in str(text or "").split() if token] + + +def clean_protocol_token(token: str) -> str: + return strip_token_edges(str(token or "").strip().upper()) + + +def is_valid_protocol_suffix(value: str) -> bool: + if not value or len(value) < 4: + return False + return all(char.isalnum() for char in value) + + +def normalize_review_protocol(value: str) -> str | None: + candidate = clean_protocol_token(value) + if not candidate.startswith("REV-"): + return None + parts = candidate.split("-") + if len(parts) != 3: + return None + prefix, date_part, suffix_part = parts + if prefix != "REV": + return None + if len(date_part) != 8 or not date_part.isdigit(): + return None + try: + datetime.strptime(date_part, "%Y%m%d") + except ValueError: + return None + if not is_valid_protocol_suffix(suffix_part): + return None + return f"{prefix}-{date_part}-{suffix_part}" + + +def extract_review_protocol_from_text(text: str) -> str | None: + for token in tokenize_text(text): + normalized = normalize_review_protocol(token) + if normalized: + return normalized + return normalize_review_protocol(str(text or "")) + + +def normalize_order_number(value) -> str | None: + order_number = str(value or "").strip().upper() + if order_number and re.fullmatch(r"PED-[A-Z0-9\\-]+", order_number): + return order_number + return None diff --git a/app/services/orchestration/turn_decision.py b/app/services/orchestration/turn_decision.py new file mode 100644 index 0000000..69ece47 --- /dev/null +++ b/app/services/orchestration/turn_decision.py @@ -0,0 +1,72 @@ +from typing import Any, Literal + +from pydantic import BaseModel, ConfigDict, Field, model_validator + + +# Esse modulo define o contrato estruturado esperado do modelo por turno. +TurnDomain = Literal["review", "sales", "general"] +TurnIntent = Literal[ + "review_schedule", + "review_list", + "review_cancel", + "review_reschedule", + "order_create", + "order_cancel", + "inventory_search", + "conversation_reset", + "queue_continue", + "discard_queue", + "cancel_active_flow", + "general", +] +TurnAction = Literal[ + "collect_review_schedule", + "collect_review_management", + "collect_order_create", + "collect_order_cancel", + "ask_missing_fields", + "answer_user", + "call_tool", + "clear_context", + "continue_queue", + "discard_queue", + "cancel_active_flow", +] + + +class DecisionEntities(BaseModel): + model_config = ConfigDict(extra="forbid") + + # As entidades continuam separadas por tipo de fluxo para facilitar + # compatibilidade com os mixins e validadores tecnicos atuais. + generic_memory: dict[str, Any] = Field(default_factory=dict) + review_fields: dict[str, Any] = Field(default_factory=dict) + review_management_fields: dict[str, Any] = Field(default_factory=dict) + order_fields: dict[str, Any] = Field(default_factory=dict) + cancel_order_fields: dict[str, Any] = Field(default_factory=dict) + + +class TurnDecision(BaseModel): + model_config = ConfigDict(extra="forbid") + + # O modelo decide a intencao, o dominio e a acao do turno. + intent: TurnIntent = "general" + domain: TurnDomain = "general" + action: TurnAction = "answer_user" + entities: DecisionEntities = Field(default_factory=DecisionEntities) + missing_fields: list[str] = Field(default_factory=list) + selection_index: int | None = None + tool_name: str | None = None + tool_arguments: dict[str, Any] = Field(default_factory=dict) + response_to_user: str | None = None + + @model_validator(mode="after") + def validate_contract(self): + if self.action == "ask_missing_fields": + if not self.missing_fields or not str(self.response_to_user or "").strip(): + raise ValueError("ask_missing_fields exige missing_fields e response_to_user") + if self.action == "call_tool" and not str(self.tool_name or "").strip(): + raise ValueError("call_tool exige tool_name") + if self.selection_index is not None and self.selection_index < 0: + raise ValueError("selection_index deve ser maior ou igual a zero") + return self diff --git a/tests/test_turn_decision_contract.py b/tests/test_turn_decision_contract.py new file mode 100644 index 0000000..fdb7c1f --- /dev/null +++ b/tests/test_turn_decision_contract.py @@ -0,0 +1,452 @@ +import os +import unittest + +os.environ.setdefault("DEBUG", "false") + +from datetime import datetime, timedelta + +from app.services.orchestration.conversation_policy import ConversationPolicy +from app.services.orchestration.entity_normalizer import EntityNormalizer +from app.services.orchestration.message_planner import MessagePlanner +from app.services.orchestration.orquestrador_service import OrquestradorService + + +class FakeLLM: + def __init__(self, responses): + self.responses = list(responses) + self.calls = 0 + + async def generate_response(self, message: str, tools): + self.calls += 1 + if self.responses: + return self.responses.pop(0) + return {"response": "", "tool_call": None} + + +class FakeState: + def __init__(self, entries=None, contexts=None): + self.entries = entries or {} + self.contexts = contexts or {} + + def get_entry(self, bucket: str, user_id: int | None, *, expire: bool = False): + if user_id is None: + return None + return self.entries.get(bucket, {}).get(user_id) + + def set_entry(self, bucket: str, user_id: int | None, value: dict): + if user_id is None: + return + self.entries.setdefault(bucket, {})[user_id] = value + + def pop_entry(self, bucket: str, user_id: int | None): + if user_id is None: + return None + return self.entries.get(bucket, {}).pop(user_id, None) + + +class FakeToolExecutor: + def __init__(self, result=None): + self.result = result or {"ok": True} + self.calls = [] + + async def execute(self, tool_name: str, arguments: dict, user_id: int | None = None): + self.calls.append((tool_name, arguments, user_id)) + if tool_name == "consultar_estoque" and arguments.get("preco_max") and float(arguments["preco_max"]) > 50000: + return [ + {"id": 7, "modelo": "Hyundai HB20 2022", "categoria": "hatch", "preco": 54500.0}, + {"id": 8, "modelo": "Chevrolet Onix 2023", "categoria": "hatch", "preco": 58900.0}, + ] + return self.result + + +class FakePolicyService: + def __init__(self, state): + self.state = state + self.normalizer = EntityNormalizer() + + def _get_user_context(self, user_id: int | None): + if user_id is None: + return None + return self.state.contexts.get(user_id) + + def _new_tab_memory(self, user_id: int | None): + return {} + + def _is_affirmative_message(self, text: str) -> bool: + normalized = self.normalizer.normalize_text(text).strip().rstrip(".!?,;:") + return normalized in {"sim", "pode", "ok", "confirmo", "aceito", "fechado", "pode sim"} + + def _is_negative_message(self, text: str) -> bool: + normalized = self.normalizer.normalize_text(text).strip().rstrip(".!?,;:") + return normalized in {"nao", "nao quero"} or normalized.startswith("nao") + + def _clear_user_conversation_state(self, user_id: int | None) -> None: + context = self._get_user_context(user_id) + if context: + context["pending_order_selection"] = None + + async def handle_message(self, message: str, user_id: int | None = None) -> str: + return f"handled:{message}" + + def _render_missing_review_fields_prompt(self, missing_fields: list[str]) -> str: + return "missing review" + + def _render_missing_review_reschedule_fields_prompt(self, missing_fields: list[str]) -> str: + return "missing review reschedule" + + def _render_missing_review_cancel_fields_prompt(self, missing_fields: list[str]) -> str: + return "missing review cancel" + + def _render_review_reuse_question(self) -> str: + return "reuse review?" + + def _render_missing_order_fields_prompt(self, missing_fields: list[str]) -> str: + return "missing order" + + def _render_missing_cancel_order_fields_prompt(self, missing_fields: list[str]) -> str: + return "missing cancel order" + + +class TurnDecisionContractTests(unittest.IsolatedAsyncioTestCase): + async def test_extract_turn_decision_retries_once_and_returns_structured_payload(self): + llm = FakeLLM( + [ + {"response": "nao eh json", "tool_call": None}, + { + "response": """ + { + "intent": "review_schedule", + "domain": "review", + "action": "ask_missing_fields", + "entities": { + "generic_memory": {}, + "review_fields": {"placa": "abc1234", "data_hora": "10/03/2026 às 09:00"}, + "review_management_fields": {}, + "order_fields": {}, + "cancel_order_fields": {} + }, + "missing_fields": ["modelo", "ano", "km"], + "tool_name": null, + "tool_arguments": {}, + "response_to_user": "Preciso do modelo, ano e quilometragem." + } + """, + "tool_call": None, + }, + ] + ) + planner = MessagePlanner(llm=llm, normalizer=EntityNormalizer()) + + decision = await planner.extract_turn_decision("Quero agendar revisão amanhã às 09:00", user_id=7) + + self.assertEqual(llm.calls, 2) + self.assertEqual(decision["intent"], "review_schedule") + self.assertEqual(decision["domain"], "review") + self.assertEqual(decision["action"], "ask_missing_fields") + self.assertEqual(decision["entities"]["review_fields"]["placa"], "ABC1234") + self.assertEqual(decision["entities"]["review_fields"]["data_hora"], "10/03/2026 às 09:00") + self.assertEqual(decision["missing_fields"], ["modelo", "ano", "km"]) + + def test_coerce_turn_decision_rejects_invalid_shape_with_fallback(self): + normalizer = EntityNormalizer() + + decision = normalizer.coerce_turn_decision( + { + "intent": "valor_invalido", + "domain": "sales", + "action": "call_tool", + "entities": [], + } + ) + + self.assertEqual(decision["intent"], "general") + self.assertEqual(decision["domain"], "general") + self.assertEqual(decision["action"], "answer_user") + self.assertEqual(decision["entities"]["order_fields"], {}) + + def test_coerce_turn_decision_rejects_missing_fields_without_response_payload(self): + normalizer = EntityNormalizer() + + decision = normalizer.coerce_turn_decision( + { + "intent": "review_schedule", + "domain": "review", + "action": "ask_missing_fields", + "entities": { + "generic_memory": {}, + "review_fields": {}, + "review_management_fields": {}, + "order_fields": {}, + "cancel_order_fields": {}, + }, + "missing_fields": [], + "tool_name": None, + "tool_arguments": {}, + "response_to_user": "", + } + ) + + self.assertEqual(decision["intent"], "general") + self.assertEqual(decision["action"], "answer_user") + + def test_turn_decision_entities_do_not_rebuild_legacy_intents(self): + service = OrquestradorService.__new__(OrquestradorService) + service.normalizer = EntityNormalizer() + + extracted = service._extracted_entities_from_turn_decision( + { + "intent": "order_create", + "domain": "sales", + "action": "collect_order_create", + "entities": { + "generic_memory": {"cpf": "12345678909"}, + "review_fields": {}, + "review_management_fields": {}, + "order_fields": {"vehicle_id": 1}, + "cancel_order_fields": {}, + }, + } + ) + + self.assertEqual(extracted["intents"], {}) + self.assertEqual(extracted["order_fields"]["vehicle_id"], 1) + + def test_turn_decision_entity_merge_preserves_generic_memory_from_previous_extraction(self): + service = OrquestradorService.__new__(OrquestradorService) + service.normalizer = EntityNormalizer() + service._empty_extraction_payload = service.normalizer.empty_extraction_payload + + merged = service._merge_extracted_entities( + { + "generic_memory": {"orcamento_max": 70000}, + "review_fields": {}, + "review_management_fields": {}, + "order_fields": {"cpf": "12345678909"}, + "cancel_order_fields": {}, + "intents": {}, + }, + { + "generic_memory": {}, + "review_fields": {}, + "review_management_fields": {}, + "order_fields": {}, + "cancel_order_fields": {}, + "intents": {}, + }, + ) + + self.assertEqual(merged["generic_memory"]["orcamento_max"], 70000) + self.assertEqual(merged["order_fields"]["cpf"], "12345678909") + + def test_entity_merge_can_enrich_message_plan_with_full_extraction(self): + service = OrquestradorService.__new__(OrquestradorService) + service.normalizer = EntityNormalizer() + service._empty_extraction_payload = service.normalizer.empty_extraction_payload + + merged = service._merge_extracted_entities( + { + "generic_memory": {}, + "review_fields": {}, + "review_management_fields": {}, + "order_fields": {"cpf": "12345678909"}, + "cancel_order_fields": {}, + "intents": {}, + }, + { + "generic_memory": {"orcamento_max": 70000}, + "review_fields": {}, + "review_management_fields": {}, + "order_fields": {}, + "cancel_order_fields": {}, + "intents": {}, + }, + ) + + self.assertEqual(merged["generic_memory"]["orcamento_max"], 70000) + self.assertEqual(merged["order_fields"]["cpf"], "12345678909") + + async def test_turn_decision_call_tool_executes_without_router(self): + service = OrquestradorService.__new__(OrquestradorService) + service.tool_executor = FakeToolExecutor(result={"numero_pedido": "PED-1", "status": "Ativo"}) + service.llm = FakeLLM([]) + service._capture_review_confirmation_suggestion = lambda **kwargs: None + service._capture_tool_result_context = lambda **kwargs: None + service._should_use_deterministic_response = lambda tool_name: True + service._fallback_format_tool_result = lambda tool_name, tool_result: f"{tool_name}:{tool_result['numero_pedido']}" + service._build_result_prompt = lambda **kwargs: "unused" + service._http_exception_detail = lambda exc: str(exc) + service._is_low_value_response = lambda text: False + + async def finish(response: str, queue_notice: str | None = None) -> str: + return response if not queue_notice else f"{queue_notice}\n{response}" + + response = await service._try_execute_business_tool_from_turn_decision( + message="quero fechar o pedido", + user_id=7, + turn_decision={ + "action": "call_tool", + "tool_name": "realizar_pedido", + "tool_arguments": {"cpf": "12345678909", "vehicle_id": 1}, + }, + queue_notice=None, + finish=finish, + ) + + self.assertEqual( + service.tool_executor.calls, + [("realizar_pedido", {"cpf": "12345678909", "vehicle_id": 1}, 7)], + ) + self.assertEqual(response, "realizar_pedido:PED-1") + self.assertEqual(service.llm.calls, 0) + + async def test_empty_stock_search_suggests_nearby_options(self): + service = OrquestradorService.__new__(OrquestradorService) + service.normalizer = EntityNormalizer() + service.tool_executor = FakeToolExecutor(result=[]) + service._get_user_context = lambda user_id: { + "generic_memory": {}, + "shared_memory": {}, + "last_stock_results": [], + "selected_vehicle": None, + } + service._capture_tool_result_context = lambda tool_name, tool_result, user_id: None + service._normalize_positive_number = service.normalizer.normalize_positive_number + + response = await service._maybe_build_stock_suggestion_response( + tool_name="consultar_estoque", + arguments={"preco_max": 50000, "limite": 5}, + tool_result=[], + user_id=5, + ) + + self.assertIn("Nao encontrei veiculos ate R$ 50.000.", response) + self.assertIn("Hyundai HB20 2022", response) + self.assertIn("Se quiser, responda com o numero da lista ou com o modelo.", response) + + async def test_turn_decision_answer_user_can_short_circuit_router(self): + decision = { + "intent": "general", + "domain": "general", + "action": "answer_user", + "response_to_user": "Resposta direta do contrato.", + } + + self.assertEqual(str(decision.get("action") or ""), "answer_user") + self.assertEqual(str(decision.get("response_to_user") or "").strip(), "Resposta direta do contrato.") + + async def test_pending_order_selection_prefers_turn_decision_domain(self): + state = FakeState( + contexts={ + 9: { + "pending_order_selection": { + "orders": [ + {"domain": "review", "message": "agendar revisao", "memory_seed": {}}, + {"domain": "sales", "message": "fazer pedido", "memory_seed": {}}, + ], + "expires_at": datetime.utcnow() + timedelta(minutes=15), + }, + "order_queue": [], + "active_domain": "general", + "generic_memory": {}, + } + } + ) + policy = ConversationPolicy(service=FakePolicyService(state)) + + response = await policy.try_resolve_pending_order_selection( + message="quero comprar", + user_id=9, + turn_decision={"domain": "sales", "intent": "order_create", "action": "collect_order_create"}, + ) + + self.assertIn("Vou comecar por: Venda: fazer pedido", response) + + async def test_pending_order_selection_prefers_turn_decision_selection_index(self): + state = FakeState( + contexts={ + 9: { + "pending_order_selection": { + "orders": [ + {"domain": "review", "message": "agendar revisao", "memory_seed": {}}, + {"domain": "sales", "message": "fazer pedido", "memory_seed": {}}, + ], + "expires_at": datetime.utcnow() + timedelta(minutes=15), + }, + "order_queue": [], + "active_domain": "general", + "generic_memory": {}, + } + } + ) + policy = ConversationPolicy(service=FakePolicyService(state)) + + response = await policy.try_resolve_pending_order_selection( + message="esse", + user_id=9, + turn_decision={"domain": "general", "intent": "general", "action": "answer_user", "selection_index": 1}, + ) + + self.assertIn("Vou comecar por: Venda: fazer pedido", response) + + async def test_try_continue_queue_prefers_turn_decision_action(self): + state = FakeState( + contexts={ + 9: { + "pending_switch": { + "target_domain": "sales", + "queued_message": "fazer pedido", + "memory_seed": {"cpf": "12345678909"}, + "expires_at": datetime.utcnow() + timedelta(minutes=15), + }, + "active_domain": "general", + "generic_memory": {}, + "pending_order_selection": None, + } + } + ) + service = FakePolicyService(state) + policy = ConversationPolicy(service=service) + policy.apply_domain_switch = lambda user_id, target_domain: service._get_user_context(user_id).update( + {"active_domain": target_domain, "pending_switch": None} + ) + + response = await policy.try_continue_queued_order( + message="ok", + user_id=9, + turn_decision={"action": "continue_queue", "intent": "queue_continue", "domain": "sales"}, + ) + + self.assertIn("Agora, sobre a compra do veiculo:", response) + + def test_handle_context_switch_prefers_turn_decision_domain_confirmation(self): + state = FakeState( + contexts={ + 9: { + "pending_switch": { + "target_domain": "review", + "expires_at": datetime.utcnow() + timedelta(minutes=15), + }, + "active_domain": "sales", + "generic_memory": {}, + "pending_order_selection": None, + } + } + ) + service = FakePolicyService(state) + policy = ConversationPolicy(service=service) + policy.apply_domain_switch = lambda user_id, target_domain: service._get_user_context(user_id).update( + {"active_domain": target_domain, "pending_switch": None} + ) + + response = policy.handle_context_switch( + message="quero revisar", + user_id=9, + target_domain_hint="review", + turn_decision={"domain": "review", "intent": "review_schedule", "action": "collect_review_schedule"}, + ) + + self.assertEqual(response, "Certo, contexto anterior encerrado. Vamos seguir com agendamento de revisao.") + + +if __name__ == "__main__": + unittest.main()