From 57dc8242422c19a63ecb7bee992f08e99cd3fad7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vitor=20Hugo=20Belorio=20Sim=C3=A3o?= Date: Mon, 9 Mar 2026 10:08:22 -0300 Subject: [PATCH] =?UTF-8?q?=F0=9F=A7=A0=20feat(orquestrador):=20ampliar=20?= =?UTF-8?q?controle=20conversacional=20e=20hidratar=20cliente=20mock=20por?= =?UTF-8?q?=20CPF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - adiciona tools de orquestracao para limpar contexto, descartar fila, cancelar fluxo e continuar pedidos pendentes - prioriza a decisao do LLM para comandos globais antes dos slot fillings ativos - melhora selecao entre pedidos concorrentes e resposta deterministica das tools de orquestracao - estrutura conflitos de horario de revisao para facilitar confirmacao posterior - reaproveita CPF da memoria/perfil do usuario no fluxo de compra - cria servico mock para hidratar customer e vincular users.cpf ao informar um CPF valido --- app/api/routes/mock.py | 18 + app/api/schemas.py | 5 + app/db/tool_seed.py | 65 ++ app/services/flows/order_flow.py | 39 ++ .../orchestration/conversation_state_store.py | 1 + .../orchestration/orchestrator_config.py | 12 + .../orchestration/orquestrador_service.py | 608 +++++++++++++++++- app/services/orchestration/prompt_builders.py | 7 + .../orchestration/response_formatter.py | 13 + app/services/tools/handlers.py | 58 +- app/services/tools/tool_registry.py | 7 +- app/services/user/mock_customer_service.py | 88 +++ 12 files changed, 877 insertions(+), 44 deletions(-) create mode 100644 app/services/user/mock_customer_service.py diff --git a/app/api/routes/mock.py b/app/api/routes/mock.py index 786067f..9b4341d 100644 --- a/app/api/routes/mock.py +++ b/app/api/routes/mock.py @@ -11,10 +11,12 @@ from app.api.schemas import ( CancelarPedidoRequest, ConsultarEstoqueRequest, EditarDataRevisaoRequest, + HidratarClienteMockRequest, ListarAgendamentosRevisaoRequest, RealizarPedidoRequest, ValidarClienteVendaRequest, ) +from app.services.user.mock_customer_service import hydrate_mock_customer_from_cpf from app.services.tools.handlers import ( agendar_revisao, avaliar_veiculo_troca, @@ -60,6 +62,22 @@ async def validar_cliente_venda_endpoint( raise HTTPException(status_code=503, detail=db_error_detail(exc)) +@router.post("/hidratar-cliente-mock") +async def hidratar_cliente_mock_endpoint( + body: HidratarClienteMockRequest, +) -> Dict[str, Any]: + """Cria ou vincula um cliente mock a partir do CPF informado, simulando consulta externa.""" + try: + return await hydrate_mock_customer_from_cpf( + cpf=body.cpf, + user_id=body.user_id, + ) + except ValueError as exc: + raise HTTPException(status_code=400, detail=str(exc)) + except SQLAlchemyError as exc: + raise HTTPException(status_code=503, detail=db_error_detail(exc)) + + @router.post("/avaliar-veiculo-troca") async def avaliar_veiculo_troca_endpoint( body: AvaliarVeiculoTrocaRequest, diff --git a/app/api/schemas.py b/app/api/schemas.py index 71c27df..da87bf3 100644 --- a/app/api/schemas.py +++ b/app/api/schemas.py @@ -39,6 +39,11 @@ class ValidarClienteVendaRequest(BaseModel): valor_veiculo: float +class HidratarClienteMockRequest(BaseModel): + cpf: str + user_id: Optional[int] = None + + class AvaliarVeiculoTrocaRequest(BaseModel): modelo: str ano: int diff --git a/app/db/tool_seed.py b/app/db/tool_seed.py index f48fae7..2b81582 100644 --- a/app/db/tool_seed.py +++ b/app/db/tool_seed.py @@ -258,6 +258,71 @@ def get_tools_definitions(): "required": ["numero_pedido", "motivo"], }, }, + { + "name": "limpar_contexto_conversa", + "description": ( + "Use esta ferramenta quando o usuario pedir para recomecar o atendimento, " + "esquecer o contexto atual, limpar memoria volatil ou iniciar do zero. " + "Ela limpa fila de pedidos, fluxos pendentes e contexto ativo do usuario." + ), + "parameters": { + "type": "object", + "properties": { + "motivo": { + "type": "string", + "description": "Resumo curto do motivo da limpeza de contexto. Opcional.", + }, + }, + "required": [], + }, + }, + { + "name": "continuar_proximo_pedido", + "description": ( + "Use esta ferramenta quando o usuario pedir para continuar, seguir ou abrir " + "o proximo pedido que ficou na fila do atendimento." + ), + "parameters": { + "type": "object", + "properties": {}, + "required": [], + }, + }, + { + "name": "descartar_pedidos_pendentes", + "description": ( + "Use esta ferramenta quando o usuario pedir para cancelar, ignorar ou " + "descartar apenas os pedidos pendentes da fila, sem apagar necessariamente " + "todo o contexto da conversa." + ), + "parameters": { + "type": "object", + "properties": { + "motivo": { + "type": "string", + "description": "Resumo curto do motivo para descartar a fila pendente. Opcional.", + }, + }, + "required": [], + }, + }, + { + "name": "cancelar_fluxo_atual", + "description": ( + "Use esta ferramenta quando o usuario pedir para cancelar apenas o fluxo " + "atual em andamento, sem limpar toda a memoria da conversa." + ), + "parameters": { + "type": "object", + "properties": { + "motivo": { + "type": "string", + "description": "Resumo curto do motivo do cancelamento do fluxo atual. Opcional.", + }, + }, + "required": [], + }, + }, ] diff --git a/app/services/flows/order_flow.py b/app/services/flows/order_flow.py index b259b4f..cfad758 100644 --- a/app/services/flows/order_flow.py +++ b/app/services/flows/order_flow.py @@ -3,12 +3,15 @@ from datetime import datetime, timedelta from fastapi import HTTPException +from app.db.mock_database import SessionMockLocal +from app.db.mock_models import User from app.services.orchestration.orchestrator_config import ( CANCEL_ORDER_REQUIRED_FIELDS, ORDER_REQUIRED_FIELDS, PENDING_CANCEL_ORDER_DRAFT_TTL_MINUTES, PENDING_ORDER_DRAFT_TTL_MINUTES, ) +from app.services.user.mock_customer_service import hydrate_mock_customer_from_cpf class OrderFlowMixin: @@ -44,6 +47,30 @@ class OrderFlowMixin: if isinstance(budget, (int, float)) and budget > 0: payload["valor_veiculo"] = float(budget) + def _try_prefill_order_cpf_from_memory(self, user_id: int | None, payload: dict) -> None: + if user_id is None or payload.get("cpf"): + return + + context = self._get_user_context(user_id) + if not context: + return + memory = context.get("generic_memory", {}) + cpf = memory.get("cpf") + if isinstance(cpf, str) and self._is_valid_cpf(cpf): + payload["cpf"] = cpf + + def _try_prefill_order_cpf_from_user_profile(self, user_id: int | None, payload: dict) -> None: + if user_id is None or payload.get("cpf"): + return + + db = SessionMockLocal() + try: + user = db.query(User).filter(User.id == user_id).first() + if user and isinstance(user.cpf, str) and self._is_valid_cpf(user.cpf): + payload["cpf"] = user.cpf + finally: + db.close() + def _render_missing_order_fields_prompt(self, missing_fields: list[str]) -> str: labels = { "cpf": "o CPF do cliente", @@ -101,6 +128,8 @@ class OrderFlowMixin: } draft["payload"].update(extracted) + self._try_prefill_order_cpf_from_memory(user_id=user_id, payload=draft["payload"]) + self._try_prefill_order_cpf_from_user_profile(user_id=user_id, payload=draft["payload"]) self._try_prefill_order_value_from_memory(user_id=user_id, payload=draft["payload"]) cpf_value = draft["payload"].get("cpf") @@ -108,6 +137,16 @@ class OrderFlowMixin: draft["payload"].pop("cpf", None) self.state.set_entry("pending_order_drafts", user_id, draft) return "Para seguir com o pedido, preciso de um CPF valido com 11 digitos." + if cpf_value: + try: + await hydrate_mock_customer_from_cpf( + cpf=str(cpf_value), + user_id=user_id, + ) + except ValueError: + draft["payload"].pop("cpf", None) + self.state.set_entry("pending_order_drafts", user_id, draft) + return "Para seguir com o pedido, preciso de um CPF valido com 11 digitos." valor = draft["payload"].get("valor_veiculo") if valor is not None: diff --git a/app/services/orchestration/conversation_state_store.py b/app/services/orchestration/conversation_state_store.py index bd96602..860d0a0 100644 --- a/app/services/orchestration/conversation_state_store.py +++ b/app/services/orchestration/conversation_state_store.py @@ -25,6 +25,7 @@ class ConversationStateStore: "generic_memory": {}, "shared_memory": {}, "order_queue": [], + "pending_order_selection": None, "pending_switch": None, "expires_at": now + timedelta(minutes=ttl_minutes), } diff --git a/app/services/orchestration/orchestrator_config.py b/app/services/orchestration/orchestrator_config.py index afd219b..5787f83 100644 --- a/app/services/orchestration/orchestrator_config.py +++ b/app/services/orchestration/orchestrator_config.py @@ -1,4 +1,5 @@ USER_CONTEXT_TTL_MINUTES = 60 +PENDING_ORDER_SELECTION_TTL_MINUTES = 15 PENDING_REVIEW_TTL_MINUTES = 30 PENDING_REVIEW_DRAFT_TTL_MINUTES = 30 @@ -46,4 +47,15 @@ DETERMINISTIC_RESPONSE_TOOLS = { "editar_data_revisao", "cancelar_pedido", "realizar_pedido", + "limpar_contexto_conversa", + "continuar_proximo_pedido", + "descartar_pedidos_pendentes", + "cancelar_fluxo_atual", +} + +ORCHESTRATION_CONTROL_TOOLS = { + "limpar_contexto_conversa", + "continuar_proximo_pedido", + "descartar_pedidos_pendentes", + "cancelar_fluxo_atual", } diff --git a/app/services/orchestration/orquestrador_service.py b/app/services/orchestration/orquestrador_service.py index 172b294..a78826c 100644 --- a/app/services/orchestration/orquestrador_service.py +++ b/app/services/orchestration/orquestrador_service.py @@ -10,6 +10,8 @@ from sqlalchemy.orm import Session from app.services.orchestration.orchestrator_config import ( DETERMINISTIC_RESPONSE_TOOLS, LOW_VALUE_RESPONSES, + ORCHESTRATION_CONTROL_TOOLS, + PENDING_ORDER_SELECTION_TTL_MINUTES, PENDING_REVIEW_TTL_MINUTES, USER_CONTEXT_TTL_MINUTES, ) @@ -34,7 +36,15 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin): def __init__(self, db: Session): """Inicializa servicos de LLM e registro de tools para a sessao atual.""" self.llm = LLMService() - self.registry = ToolRegistry(db) + self.registry = ToolRegistry(db, extra_handlers=self._build_orchestration_tool_handlers()) + + def _build_orchestration_tool_handlers(self) -> dict: + return { + "limpar_contexto_conversa": self._tool_limpar_contexto_conversa, + "continuar_proximo_pedido": self._tool_continuar_proximo_pedido, + "descartar_pedidos_pendentes": self._tool_descartar_pedidos_pendentes, + "cancelar_fluxo_atual": self._tool_cancelar_fluxo_atual, + } async def handle_message(self, message: str, user_id: int | None = None) -> str: """Processa mensagem, executa tool quando necessario e retorna resposta final.""" @@ -50,6 +60,12 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin): ) self._upsert_user_context(user_id=user_id) + pending_order_selection = await self._try_resolve_pending_order_selection( + message=message, + user_id=user_id, + ) + if pending_order_selection: + return pending_order_selection queued_followup = await self._try_continue_queued_order(message=message, user_id=user_id) if queued_followup: return queued_followup @@ -105,6 +121,16 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin): self._update_active_domain(user_id=user_id, domain_hint=domain_hint) + orchestration_override = await self._try_execute_orchestration_control_tool( + message=routing_message, + user_id=user_id, + extracted_entities=extracted_entities, + queue_notice=queue_notice, + finish=finish, + ) + if orchestration_override: + return orchestration_override + review_management_response = await self._try_handle_review_management( message=routing_message, user_id=user_id, @@ -225,6 +251,72 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin): ) return await finish(text, queue_notice=queue_notice) + async def _try_execute_orchestration_control_tool( + self, + message: str, + user_id: int | None, + extracted_entities: dict, + queue_notice: str | None, + finish, + ) -> str | None: + tools = self.registry.get_tools() + llm_result = await self.llm.generate_response( + message=self._build_router_prompt(user_message=message, user_id=user_id), + tools=tools, + ) + + tool_call = llm_result.get("tool_call") or {} + tool_name = tool_call.get("name") + if tool_name in ORCHESTRATION_CONTROL_TOOLS: + try: + tool_result = await self.registry.execute( + tool_name, + tool_call.get("arguments") or {}, + user_id=user_id, + ) + except HTTPException as exc: + return await finish(self._http_exception_detail(exc), queue_notice=queue_notice) + return await finish( + self._fallback_format_tool_result(tool_name, tool_result), + queue_notice=queue_notice, + ) + + first_pass_text = (llm_result.get("response") or "").strip() + should_force_tool = ( + not tool_name + and ( + self._has_open_flow(user_id=user_id, domain="review") + or self._has_open_flow(user_id=user_id, domain="sales") + or bool((self._get_user_context(user_id) or {}).get("pending_switch")) + or bool((self._get_user_context(user_id) or {}).get("order_queue")) + ) + and self._is_low_value_response(first_pass_text) + ) + if not should_force_tool: + return None + + llm_result = await self.llm.generate_response( + message=self._build_force_tool_prompt(user_message=message, user_id=user_id), + tools=tools, + ) + forced_tool_call = llm_result.get("tool_call") or {} + forced_tool_name = forced_tool_call.get("name") + if forced_tool_name not in ORCHESTRATION_CONTROL_TOOLS: + return None + + try: + tool_result = await self.registry.execute( + forced_tool_name, + forced_tool_call.get("arguments") or {}, + user_id=user_id, + ) + except HTTPException as exc: + return await finish(self._http_exception_detail(exc), queue_notice=queue_notice) + return await finish( + self._fallback_format_tool_result(forced_tool_name, tool_result), + queue_notice=queue_notice, + ) + def _reset_pending_review_states(self, user_id: int | None) -> None: if user_id is None: return @@ -239,6 +331,127 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin): self.state.pop_entry("pending_order_drafts", user_id) self.state.pop_entry("pending_cancel_order_drafts", user_id) + def _clear_user_conversation_state(self, user_id: int | None) -> None: + context = self._get_user_context(user_id) + if not context: + return + self._reset_pending_review_states(user_id=user_id) + self._reset_pending_order_states(user_id=user_id) + context["active_domain"] = "general" + context["generic_memory"] = {} + context["shared_memory"] = {} + context["order_queue"] = [] + context["pending_order_selection"] = None + context["pending_switch"] = None + + def _clear_pending_order_navigation(self, user_id: int | None) -> int: + context = self._get_user_context(user_id) + if not context: + return 0 + dropped = len(context.get("order_queue", [])) + if context.get("pending_switch"): + dropped += 1 + if context.get("pending_order_selection"): + pending_orders = context["pending_order_selection"].get("orders") or [] + dropped += len(pending_orders) + context["order_queue"] = [] + context["pending_switch"] = None + context["pending_order_selection"] = None + return dropped + + def _cancel_active_flow(self, user_id: int | None) -> str: + context = self._get_user_context(user_id) + if not context: + return "Nao havia contexto ativo para cancelar." + + active_domain = context.get("active_domain", "general") + had_flow = self._has_open_flow(user_id=user_id, domain=active_domain) + if active_domain == "review": + self._reset_pending_review_states(user_id=user_id) + elif active_domain == "sales": + self._reset_pending_order_states(user_id=user_id) + + context["pending_switch"] = None + if had_flow: + return f"Fluxo atual de {self._domain_label(active_domain)} cancelado." + return "Nao havia fluxo em andamento para cancelar." + + async def _continue_next_order_now(self, user_id: int | None) -> str: + context = self._get_user_context(user_id) + if not context: + return "Nao encontrei contexto ativo para continuar." + + if context.get("pending_order_selection"): + return "Ainda preciso que voce escolha qual das duas acoes deseja iniciar primeiro." + + pending_switch = context.get("pending_switch") + if isinstance(pending_switch, dict): + queued_message = str(pending_switch.get("queued_message") or "").strip() + if queued_message: + target_domain = str(pending_switch.get("target_domain") or "general") + memory_seed = dict(pending_switch.get("memory_seed") or {}) + self._apply_domain_switch(user_id=user_id, target_domain=target_domain) + refreshed = self._get_user_context(user_id) + if refreshed is not None: + refreshed["generic_memory"] = memory_seed + transition = self._build_next_order_transition(target_domain) + next_response = await self.handle_message(queued_message, user_id=user_id) + return f"{transition}\n{next_response}" + + next_order = self._pop_next_order(user_id=user_id) + if not next_order: + return "Nao ha pedidos pendentes na fila para continuar." + + target_domain = str(next_order.get("domain") or "general") + memory_seed = dict(next_order.get("memory_seed") or self._new_tab_memory(user_id=user_id)) + self._apply_domain_switch(user_id=user_id, target_domain=target_domain) + refreshed = self._get_user_context(user_id) + if refreshed is not None: + refreshed["generic_memory"] = memory_seed + transition = self._build_next_order_transition(target_domain) + next_response = await self.handle_message(str(next_order.get("message") or ""), user_id=user_id) + return f"{transition}\n{next_response}" + + async def _tool_limpar_contexto_conversa( + self, + motivo: str | None = None, + user_id: int | None = None, + ) -> dict: + self._clear_user_conversation_state(user_id=user_id) + message = "Contexto da conversa limpo. Podemos recomecar do zero." + if motivo: + message = f"{message}\nMotivo registrado: {motivo.strip()}" + return {"message": message} + + async def _tool_descartar_pedidos_pendentes( + self, + motivo: str | None = None, + user_id: int | None = None, + ) -> dict: + dropped = self._clear_pending_order_navigation(user_id=user_id) + if dropped <= 0: + message = "Nao havia pedidos pendentes na fila para descartar." + elif dropped == 1: + message = "Descartei 1 pedido pendente da fila." + else: + message = f"Descartei {dropped} pedidos pendentes da fila." + if motivo: + message = f"{message}\nMotivo registrado: {motivo.strip()}" + return {"message": message} + + async def _tool_cancelar_fluxo_atual( + self, + motivo: str | None = None, + user_id: int | None = None, + ) -> dict: + message = self._cancel_active_flow(user_id=user_id) + if motivo: + message = f"{message}\nMotivo registrado: {motivo.strip()}" + return {"message": message} + + async def _tool_continuar_proximo_pedido(self, user_id: int | None = None) -> str: + return await self._continue_next_order_now(user_id=user_id) + # Nessa função é onde eu configuro a memória volátil do sistema def _upsert_user_context(self, user_id: int | None) -> None: self.state.upsert_user_context(user_id=user_id, ttl_minutes=USER_CONTEXT_TTL_MINUTES) @@ -597,18 +810,77 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin): return False return None + def _normalize_datetime_connector(self, text: str) -> str: + compact = " ".join(str(text or "").strip().split()) + lowered = compact.lower() + marker = " as " + if marker in lowered: + index = lowered.index(marker) + return f"{compact[:index]} {compact[index + len(marker):]}".strip() + return compact + + def _try_parse_iso_datetime(self, text: str) -> datetime | None: + candidate = str(text or "").strip() + if not candidate: + return None + try: + return datetime.fromisoformat(candidate.replace("Z", "+00:00")) + except ValueError: + return None + + def _try_parse_datetime_with_formats(self, text: str, formats: tuple[str, ...]) -> datetime | None: + candidate = str(text or "").strip() + if not candidate: + return None + for fmt in formats: + try: + return datetime.strptime(candidate, fmt) + except ValueError: + continue + return None + + def _try_parse_review_absolute_datetime(self, text: str) -> datetime | None: + normalized = self._normalize_datetime_connector(text) + parsed = self._try_parse_iso_datetime(normalized) + if parsed is not None: + return parsed + + day_first_formats = ( + "%d/%m/%Y %H:%M", + "%d/%m/%Y %H:%M:%S", + "%d-%m-%Y %H:%M", + "%d-%m-%Y %H:%M:%S", + ) + year_first_formats = ( + "%Y/%m/%d %H:%M", + "%Y/%m/%d %H:%M:%S", + "%Y-%m-%d %H:%M", + "%Y-%m-%d %H:%M:%S", + ) + return self._try_parse_datetime_with_formats(normalized, day_first_formats + year_first_formats) + + def _extract_hhmm_from_text(self, text: str) -> str | None: + cleaned = self._normalize_datetime_connector(text) + for token in cleaned.split(): + normalized_token = self._strip_token_edges(token) + parts = normalized_token.split(":") + if len(parts) not in {2, 3}: + continue + if not all(part.isdigit() for part in parts): + continue + hour = int(parts[0]) + minute = int(parts[1]) + if 0 <= hour <= 23 and 0 <= minute <= 59: + return f"{hour:02d}:{minute:02d}" + return None + def _normalize_review_datetime_text(self, value) -> str | None: text = str(value or "").strip() if not text: return None - # Mantem formatos absolutos que o handler ja sabe interpretar. - absolute_patterns = ( - r"^\d{1,2}[/-]\d{1,2}[/-]\d{4}\s+(?:as\s+)?\d{1,2}:\d{2}(?::\d{2})?$", - r"^\d{4}[/-]\d{1,2}[/-]\d{1,2}\s+(?:as\s+)?\d{1,2}:\d{2}(?::\d{2})?$", - r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}(?::\d{2})?(?:Z|[+-]\d{2}:\d{2})?$", - ) - if any(re.match(pattern, text, flags=re.IGNORECASE) for pattern in absolute_patterns): + absolute_dt = self._try_parse_review_absolute_datetime(text) + if absolute_dt is not None: return text normalized = self._normalize_text(text) @@ -620,12 +892,13 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin): if day_offset is None: return text - time_match = re.search(r"\b([01]?\d|2[0-3]):([0-5]\d)\b", normalized) - if not time_match: + time_text = self._extract_hhmm_from_text(normalized) + if not time_text: return text - hour = int(time_match.group(1)) - minute = int(time_match.group(2)) + hour_text, minute_text = time_text.split(":") + hour = int(hour_text) + minute = int(minute_text) target_date = datetime.now() + timedelta(days=day_offset) return f"{target_date.strftime('%d/%m/%Y')} {hour:02d}:{minute:02d}" @@ -673,11 +946,63 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin): extracted["revisao_previa_concessionaria"] = reviewed return extracted - def _extract_review_protocol_from_text(self, text: str) -> str | None: - match = re.search(r"\bREV-[A-Z0-9\-]+\b", str(text or "").upper()) - if not match: + def _tokenize_text(self, text: str) -> list[str]: + return [token for token in str(text or "").split() if token] + + def _clean_protocol_token(self, token: str) -> str: + cleaned = str(token or "").strip().upper() + edge_chars = "[](){}<>,.;:!?\"'`" + while cleaned and cleaned[0] in edge_chars: + cleaned = cleaned[1:] + while cleaned and cleaned[-1] in edge_chars: + cleaned = cleaned[:-1] + return cleaned + + def _strip_token_edges(self, token: str) -> str: + cleaned = str(token or "").strip() + edge_chars = "[](){}<>,.;:!?\"'`" + while cleaned and cleaned[0] in edge_chars: + cleaned = cleaned[1:] + while cleaned and cleaned[-1] in edge_chars: + cleaned = cleaned[:-1] + return cleaned + + def _is_valid_protocol_suffix(self, value: str) -> bool: + if not value or len(value) < 4: + return False + return all(char.isalnum() for char in value) + + def _normalize_review_protocol(self, value: str) -> str | None: + candidate = self._clean_protocol_token(value) + if not candidate.startswith("REV-"): return None - return match.group(0) + + parts = candidate.split("-") + if len(parts) != 3: + return None + + prefix, date_part, suffix_part = parts + if prefix != "REV": + return None + if len(date_part) != 8 or not date_part.isdigit(): + return None + try: + datetime.strptime(date_part, "%Y%m%d") + except ValueError: + return None + if not self._is_valid_protocol_suffix(suffix_part): + return None + return f"{prefix}-{date_part}-{suffix_part}" + + def _extract_review_protocol_from_text(self, text: str) -> str | None: + for token in self._tokenize_text(text): + normalized = self._normalize_review_protocol(token) + if normalized: + return normalized + normalized_full = self._normalize_review_protocol(str(text or "")) + if normalized_full: + return normalized_full + return None def _normalize_review_management_fields(self, data) -> dict: if not isinstance(data, dict): @@ -762,6 +1087,20 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin): payload["placa"] = plate def _queue_order(self, user_id: int | None, domain: str, order_message: str) -> None: + self._queue_order_with_memory_seed( + user_id=user_id, + domain=domain, + order_message=order_message, + memory_seed=self._new_tab_memory(user_id=user_id), + ) + + def _queue_order_with_memory_seed( + self, + user_id: int | None, + domain: str, + order_message: str, + memory_seed: dict | None = None, + ) -> None: context = self._get_user_context(user_id) if not context: return @@ -772,7 +1111,7 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin): { "domain": domain, "message": (order_message or "").strip(), - "memory_seed": self._new_tab_memory(user_id=user_id), + "memory_seed": dict(memory_seed or self._new_tab_memory(user_id=user_id)), "created_at": datetime.utcnow().isoformat(), } ) @@ -814,6 +1153,14 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin): if not extracted_orders: extracted_orders = [{"domain": "general", "message": (message or "").strip()}] + if ( + len(extracted_orders) == 2 + and all(order["domain"] != "general" for order in extracted_orders) + and not self._has_open_flow(user_id=user_id, domain=active_domain) + ): + self._store_pending_order_selection(user_id=user_id, orders=extracted_orders) + return message, None, self._render_order_selection_prompt(extracted_orders) + if len(extracted_orders) <= 1: inferred = extracted_orders[0]["domain"] if ( @@ -868,6 +1215,212 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin): lines.append(response) return "\n".join(lines) + def _store_pending_order_selection(self, user_id: int | None, orders: list[dict]) -> None: + context = self._get_user_context(user_id) + if not context: + return + context["pending_order_selection"] = { + "orders": [ + { + "domain": order["domain"], + "message": order["message"], + "memory_seed": self._new_tab_memory(user_id=user_id), + } + for order in orders[:2] + ], + "expires_at": datetime.utcnow() + timedelta(minutes=PENDING_ORDER_SELECTION_TTL_MINUTES), + } + + def _render_order_selection_prompt(self, orders: list[dict]) -> str: + if len(orders) < 2: + return "Qual das acoes voce quer iniciar primeiro?" + first_label = self._describe_order_selection_option(orders[0]) + second_label = self._describe_order_selection_option(orders[1]) + return ( + "Identifiquei duas acoes na sua mensagem:\n" + f"1. {first_label}\n" + f"2. {second_label}\n" + "Qual delas voce quer iniciar primeiro? Se for indiferente, eu escolho." + ) + + def _describe_order_selection_option(self, order: dict) -> str: + domain = str(order.get("domain") or "general") + message = str(order.get("message") or "").strip() + domain_prefix = { + "review": "Revisao", + "sales": "Venda", + "general": "Atendimento", + }.get(domain, "Atendimento") + return f"{domain_prefix}: {message}" + + def _contains_any_term(self, text: str, terms: set[str]) -> bool: + return any(term in text for term in terms) + + def _strip_choice_message(self, text: str) -> str: + cleaned = (text or "").strip() + trailing_chars = ".!?,;:" + while cleaned and cleaned[-1] in trailing_chars: + cleaned = cleaned[:-1].rstrip() + return cleaned + + def _remove_order_selection_reset_prefix(self, message: str) -> str: + raw = (message or "").strip() + normalized = self._normalize_text(raw) + prefixes = ( + "esqueca tudo agora", + "esqueca tudo", + "esquece tudo agora", + "esquece tudo", + ) + for prefix in prefixes: + if normalized.startswith(prefix): + return raw[len(prefix):].lstrip(" ,.:;-") + return raw + + def _is_order_selection_reset_message(self, message: str) -> bool: + normalized = self._normalize_text(message).strip() + reset_terms = { + "esqueca tudo", + "esqueca tudo agora", + "esquece tudo", + "esquece tudo agora", + "ignora isso", + "ignore isso", + "deixa isso", + "deixa pra la", + "deixa para la", + "novo assunto", + "muda de assunto", + "vamos comecar de novo", + "comecar de novo", + "reiniciar", + "resetar conversa", + } + return self._contains_any_term(normalized, reset_terms) + + def _looks_like_fresh_operational_request(self, message: str) -> bool: + normalized = self._normalize_text(message).strip() + if len(normalized) < 15: + return False + operational_terms = { + "agendar", + "revisao", + "cancelar", + "pedido", + "comprar", + "compra", + "carro", + "veiculo", + "remarcar", + "tambem", + } + return self._contains_any_term(normalized, operational_terms) + + def _detect_selected_order_index(self, message: str, orders: list[dict]) -> tuple[int | None, bool]: + normalized = self._strip_choice_message(self._normalize_text(message)) + + indifferent_tokens = { + "tanto faz", + "indiferente", + "qualquer um", + "qualquer uma", + "voce escolhe", + "pode escolher", + "fica a seu criterio", + } + if normalized in indifferent_tokens: + return 0, True + + first_tokens = {"1", "primeiro", "primeira", "opcao 1", "acao 1", "pedido 1"} + second_tokens = {"2", "segundo", "segunda", "opcao 2", "acao 2", "pedido 2"} + if normalized in first_tokens: + return 0, False + if normalized in second_tokens: + return 1, False + + review_keywords = { + "revisao", + "agendamento", + "agendar", + "remarcar", + "pos venda", + } + sales_keywords = { + "venda", + "compra", + "comprar", + "pedido", + "cancelamento", + "cancelar", + "carro", + "veiculo", + } + + review_matches = [index for index, order in enumerate(orders) if order.get("domain") == "review"] + sales_matches = [index for index, order in enumerate(orders) if order.get("domain") == "sales"] + has_review_signal = self._contains_any_term(normalized, review_keywords) + has_sales_signal = self._contains_any_term(normalized, sales_keywords) + + if len(review_matches) == 1 and has_review_signal and not has_sales_signal: + return review_matches[0], False + if len(sales_matches) == 1 and has_sales_signal and not has_review_signal: + return sales_matches[0], False + + return None, False + + async def _try_resolve_pending_order_selection( + self, + message: str, + user_id: int | None, + ) -> str | None: + context = self._get_user_context(user_id) + if not context: + return None + + pending = context.get("pending_order_selection") + if not isinstance(pending, dict): + return None + if pending.get("expires_at") and pending["expires_at"] < datetime.utcnow(): + context["pending_order_selection"] = None + return None + + orders = pending.get("orders") or [] + if len(orders) < 2: + context["pending_order_selection"] = None + return None + + if self._is_order_selection_reset_message(message): + self._clear_user_conversation_state(user_id=user_id) + cleaned_message = self._remove_order_selection_reset_prefix(message) + if not cleaned_message: + return "Tudo bem. Limpei o contexto atual. Pode me dizer o que voce quer fazer agora?" + return await self.handle_message(cleaned_message, user_id=user_id) + + selected_index, auto_selected = self._detect_selected_order_index(message=message, orders=orders) + if selected_index is None: + if self._looks_like_fresh_operational_request(message): + context["pending_order_selection"] = None + return None + return self._render_order_selection_prompt(orders) + + selected_order = orders[selected_index] + remaining_order = orders[1 - selected_index] + context["pending_order_selection"] = None + self._queue_order_with_memory_seed( + user_id=user_id, + domain=remaining_order["domain"], + order_message=remaining_order["message"], + memory_seed=remaining_order.get("memory_seed"), + ) + + intro = ( + f"Vou escolher e comecar por: {self._describe_order_selection_option(selected_order)}" + if auto_selected + else f"Perfeito. Vou comecar por: {self._describe_order_selection_option(selected_order)}" + ) + next_response = await self.handle_message(str(selected_order.get("message") or ""), user_id=user_id) + return f"{intro}\n{next_response}" + def _render_queue_notice(self, queued_count: int) -> str | None: if queued_count <= 0: return None @@ -1036,6 +1589,7 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin): self._reset_pending_order_states(user_id=user_id) context["active_domain"] = target_domain context["generic_memory"] = self._new_tab_memory(user_id=user_id) + context["pending_order_selection"] = None context["pending_switch"] = None def _handle_context_switch( @@ -1060,6 +1614,10 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin): context["pending_switch"] = None return "Perfeito, vamos continuar no fluxo atual." + pending_order_selection = context.get("pending_order_selection") + if pending_order_selection and pending_order_selection.get("expires_at") < datetime.utcnow(): + context["pending_order_selection"] = None + current_domain = context.get("active_domain", "general") target_domain = target_domain_hint if target_domain == "general" or target_domain == current_domain: @@ -1142,10 +1700,7 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin): ) def _extract_time_only(self, text: str) -> str | None: - match = re.search(r"\b([01]?\d|2[0-3]):([0-5]\d)\b", text or "") - if not match: - return None - return f"{int(match.group(1)):02d}:{match.group(2)}" + return self._extract_hhmm_from_text(text) def _merge_date_with_time(self, base_iso: str, new_time_hhmm: str) -> str | None: try: @@ -1168,11 +1723,10 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin): ) -> None: if tool_name != "agendar_revisao" or user_id is None or exc.status_code != 409: return - detail = exc.detail if isinstance(exc.detail, str) else "" - match = re.search(r"ISO:\s*([^)]+)\)", detail) - if not match: + detail = exc.detail if isinstance(exc.detail, dict) else {} + suggested_iso = str(detail.get("suggested_iso") or "").strip() + if not suggested_iso: return - suggested_iso = match.group(1).strip() payload = dict(arguments or {}) if not payload.get("placa"): return @@ -1280,6 +1834,10 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin): detail = exc.detail if isinstance(detail, str): return detail + if isinstance(detail, dict): + message = str(detail.get("message") or "").strip() + if message: + return message return "Nao foi possivel concluir a operacao solicitada." def _fallback_format_tool_result(self, tool_name: str, tool_result) -> str: diff --git a/app/services/orchestration/prompt_builders.py b/app/services/orchestration/prompt_builders.py index 7f7d746..93e8845 100644 --- a/app/services/orchestration/prompt_builders.py +++ b/app/services/orchestration/prompt_builders.py @@ -15,6 +15,11 @@ def build_router_prompt( "Voce e um assistente de concessionaria. " "Sempre que a solicitacao depender de dados operacionais (estoque, validacao de cliente, " "avaliacao de troca, agendamento de revisao, realizacao ou cancelamento de pedido), use a tool correta. " + "Se o usuario pedir para recomecar, esquecer contexto, cancelar fluxo atual, descartar fila pendente " + "ou continuar o proximo pedido, use a tool de orquestracao apropriada. " + "Mensagens de controle da conversa tem prioridade sobre qualquer fluxo em aberto. " + "Se houver um rascunho ativo e o usuario mandar algo como esquecer tudo, cancelar fluxo, descartar pendencias " + "ou continuar, trate isso como comando global e chame a tool correspondente. " "Se faltar parametro obrigatorio para a tool, responda em texto pedindo apenas o que falta.\n\n" f"{user_context}" f"{conversation_context}\n" @@ -30,6 +35,8 @@ def build_force_tool_prompt( user_context = _build_user_context_line(user_id) return ( "Reavalie a mensagem e priorize chamar tool se houver intencao operacional. " + "Considere tambem tools de orquestracao para limpar contexto, cancelar fluxo, descartar fila ou continuar o proximo pedido. " + "Mesmo com fluxo incremental ativo, se a mensagem for de controle global da conversa, a tool de orquestracao deve vencer o rascunho atual. " "Use texto apenas quando faltar dado obrigatorio.\n\n" f"{user_context}" f"{conversation_context}\n" diff --git a/app/services/orchestration/response_formatter.py b/app/services/orchestration/response_formatter.py index 7bffafb..05714bb 100644 --- a/app/services/orchestration/response_formatter.py +++ b/app/services/orchestration/response_formatter.py @@ -134,4 +134,17 @@ def fallback_format_tool_result(tool_name: str, tool_result: Any) -> str: f"Limite: {limite}" ) + if tool_name in { + "limpar_contexto_conversa", + "continuar_proximo_pedido", + "descartar_pedidos_pendentes", + "cancelar_fluxo_atual", + }: + if isinstance(tool_result, str): + return tool_result + if isinstance(tool_result, dict): + message = str(tool_result.get("message") or "").strip() + if message: + return message + return "Operacao concluida com sucesso." diff --git a/app/services/tools/handlers.py b/app/services/tools/handlers.py index aa6c325..0ce9947 100644 --- a/app/services/tools/handlers.py +++ b/app/services/tools/handlers.py @@ -8,7 +8,8 @@ from fastapi import HTTPException from sqlalchemy import func from app.db.mock_database import SessionMockLocal -from app.db.mock_models import Customer, Order, ReviewSchedule, Vehicle +from app.db.mock_models import Customer, Order, ReviewSchedule, User, Vehicle +from app.services.user.mock_customer_service import hydrate_mock_customer_from_cpf # Nesse arquivo eu faço a normalização dos dados para persisti-los no DB @@ -269,6 +270,31 @@ def _find_next_available_review_slot( return None +def _build_review_conflict_detail( + requested_dt: datetime, + suggested_dt: Optional[datetime] = None, +) -> Dict[str, Any]: + if suggested_dt is not None: + return { + "code": "review_schedule_conflict", + "message": ( + f"O horario {_format_datetime_pt_br(requested_dt)} ja esta ocupado. " + f"Posso agendar em {_format_datetime_pt_br(suggested_dt)}." + ), + "requested_iso": requested_dt.isoformat(), + "suggested_iso": suggested_dt.isoformat(), + } + return { + "code": "review_schedule_conflict", + "message": ( + f"O horario {_format_datetime_pt_br(requested_dt)} ja esta ocupado e nao encontrei " + "disponibilidade nas proximas 8 horas." + ), + "requested_iso": requested_dt.isoformat(), + "suggested_iso": None, + } + + async def agendar_revisao( placa: str, data_hora: str, @@ -329,18 +355,14 @@ async def agendar_revisao( if proximo_horario: raise HTTPException( status_code=409, - detail=( - f"O horario {_format_datetime_pt_br(dt)} ja esta ocupado. " - f"Posso agendar em {_format_datetime_pt_br(proximo_horario)} " - f"(ISO: {proximo_horario.isoformat()})." + detail=_build_review_conflict_detail( + requested_dt=dt, + suggested_dt=proximo_horario, ), ) raise HTTPException( status_code=409, - detail=( - f"O horario {_format_datetime_pt_br(dt)} ja esta ocupado e nao encontrei " - "disponibilidade nas proximas 8 horas." - ), + detail=_build_review_conflict_detail(requested_dt=dt), ) existente = db.query(ReviewSchedule).filter(ReviewSchedule.protocolo == protocolo).first() @@ -526,18 +548,14 @@ async def editar_data_revisao( if proximo_horario: raise HTTPException( status_code=409, - detail=( - f"O horario {_format_datetime_pt_br(nova_data)} ja esta ocupado. " - f"Sugestao: {_format_datetime_pt_br(proximo_horario)} " - f"(ISO: {proximo_horario.isoformat()})." + detail=_build_review_conflict_detail( + requested_dt=nova_data, + suggested_dt=proximo_horario, ), ) raise HTTPException( status_code=409, - detail=( - f"O horario {_format_datetime_pt_br(nova_data)} ja esta ocupado e nao encontrei " - "disponibilidade nas proximas 8 horas." - ), + detail=_build_review_conflict_detail(requested_dt=nova_data), ) agendamento.data_hora = nova_data @@ -612,6 +630,7 @@ async def cancelar_pedido(numero_pedido: str, motivo: str, user_id: Optional[int async def realizar_pedido(cpf: str, valor_veiculo: float, user_id: Optional[int] = None) -> Dict[str, Any]: """Cria um novo pedido de compra quando o cliente estiver aprovado para o valor informado.""" cpf_norm = normalize_cpf(cpf) + await hydrate_mock_customer_from_cpf(cpf=cpf_norm, user_id=user_id) avaliacao = await validar_cliente_venda(cpf=cpf_norm, valor_veiculo=valor_veiculo) if not avaliacao.get("aprovado"): raise HTTPException( @@ -625,6 +644,11 @@ async def realizar_pedido(cpf: str, valor_veiculo: float, user_id: Optional[int] numero_pedido = f"PED-{datetime.utcnow().strftime('%Y%m%d%H%M%S')}-{uuid4().hex[:6].upper()}" db = SessionMockLocal() try: + if user_id is not None: + user = db.query(User).filter(User.id == user_id).first() + if user and user.cpf != cpf_norm: + user.cpf = cpf_norm + pedido = Order( numero_pedido=numero_pedido, user_id=user_id, diff --git a/app/services/tools/tool_registry.py b/app/services/tools/tool_registry.py index bd87544..afce663 100644 --- a/app/services/tools/tool_registry.py +++ b/app/services/tools/tool_registry.py @@ -33,13 +33,16 @@ HANDLERS: Dict[str, Callable] = { class ToolRegistry: - def __init__(self, db: Session): + def __init__(self, db: Session, extra_handlers: Dict[str, Callable] | None = None): """Carrega tools do banco e registra apenas as que possuem handler conhecido.""" self._tools = [] + available_handlers = dict(HANDLERS) + if extra_handlers: + available_handlers.update(extra_handlers) repo = ToolRepository(db) db_tools = repo.get_all() for db_tool in db_tools: - handler = HANDLERS.get(db_tool.name) + handler = available_handlers.get(db_tool.name) if not handler: continue self.register_tool( diff --git a/app/services/user/mock_customer_service.py b/app/services/user/mock_customer_service.py new file mode 100644 index 0000000..f32371a --- /dev/null +++ b/app/services/user/mock_customer_service.py @@ -0,0 +1,88 @@ +import hashlib + +from app.db.mock_database import SessionMockLocal +from app.db.mock_models import Customer, User + + +MOCK_CUSTOMER_NAMES = [ + "Ana Souza", + "Bruno Lima", + "Carla Mendes", + "Diego Santos", + "Eduarda Alves", + "Felipe Rocha", + "Gabriela Costa", + "Henrique Martins", + "Isabela Ferreira", + "Joao Ribeiro", +] + + +def _normalize_cpf(value: str) -> str: + return "".join(char for char in str(value or "") if char.isdigit()) + + +def _stable_int(seed_text: str) -> int: + digest = hashlib.sha256(seed_text.encode("utf-8")).hexdigest() + return int(digest[:16], 16) + + +def _build_mock_customer_profile(cpf: str) -> dict: + entropy = _stable_int(cpf) + name = MOCK_CUSTOMER_NAMES[entropy % len(MOCK_CUSTOMER_NAMES)] + return { + "cpf": cpf, + "nome": f"{name} {_stable_int(cpf + '-suffix') % 900 + 100}", + "score": int(350 + (entropy % 500)), + "limite_credito": float(35_000 + (entropy % 140_000)), + "possui_restricao": (entropy % 9 == 0), + } + + +async def hydrate_mock_customer_from_cpf( + cpf: str, + user_id: int | None = None, +) -> dict: + """ + Simula uma consulta externa por CPF e garante dados ficticios no banco mock. + + Esta funcao existe apenas para a fase local/mock. Quando a API real entrar, + ela deve ser substituida/removida. + """ + cpf_norm = _normalize_cpf(cpf) + if len(cpf_norm) != 11: + raise ValueError("CPF invalido para hidratacao mock.") + + db = SessionMockLocal() + try: + customer = db.query(Customer).filter(Customer.cpf == cpf_norm).first() + created_customer = False + if not customer: + customer = Customer(**_build_mock_customer_profile(cpf_norm)) + db.add(customer) + created_customer = True + + linked_user = False + user = None + if user_id is not None: + user = db.query(User).filter(User.id == user_id).first() + if user and user.cpf != cpf_norm: + user.cpf = cpf_norm + linked_user = True + + db.commit() + db.refresh(customer) + if user is not None: + db.refresh(user) + + return { + "cpf": customer.cpf, + "nome": customer.nome, + "score": int(customer.score), + "limite_credito": float(customer.limite_credito), + "possui_restricao": bool(customer.possui_restricao), + "customer_created": created_customer, + "user_linked": linked_user, + } + finally: + db.close()