diff --git a/app/repositories/user_repository.py b/app/repositories/user_repository.py index 6947b5e..a7ea49a 100644 --- a/app/repositories/user_repository.py +++ b/app/repositories/user_repository.py @@ -8,10 +8,6 @@ class UserRepository: """Inicializa o repositorio de usuarios com a sessao ativa.""" self.db = db - def get_by_id(self, user_id: int): - """Busca usuario por ID interno.""" - return self.db.query(User).filter(User.id == user_id).first() - def get_by_channel_external_id(self, channel: str, external_id: str): """Busca usuario por canal e identificador externo.""" return ( diff --git a/app/services/orquestrador_service.py b/app/services/orquestrador_service.py index c6d7e08..2d702fd 100644 --- a/app/services/orquestrador_service.py +++ b/app/services/orquestrador_service.py @@ -10,6 +10,9 @@ from app.services.tool_registry import ToolRegistry class OrquestradorService: + USER_CONTEXTS: dict[int, dict] = {} + USER_CONTEXT_TTL_MINUTES = 60 + # Memoria temporaria de confirmacao quando a API sugere novo horario (conflito 409). PENDING_REVIEW_CONFIRMATIONS: dict[int, dict] = {} PENDING_REVIEW_TTL_MINUTES = 30 # Pode ser alterado por uma variavel de configuracao caso o sistema cresca @@ -54,18 +57,44 @@ class OrquestradorService: async def handle_message(self, message: str, user_id: int | None = None) -> str: """Processa mensagem, executa tool quando necessario e retorna resposta final.""" - routing_message = self._resolve_primary_intent_message(message=message, user_id=user_id) + async def finish(response: str, queue_notice: str | None = None) -> str: + composed = self._compose_order_aware_response( + response=response, + user_id=user_id, + queue_notice=queue_notice, + ) + return await self._maybe_auto_advance_next_order( + base_response=composed, + user_id=user_id, + ) + + self._upsert_user_context(user_id=user_id) + self._capture_generic_memory(message=message, user_id=user_id) + + ( + routing_message, + queue_notice, + queue_early_response, + ) = self._prepare_message_for_single_order(message=message, user_id=user_id) + if queue_early_response: + return await finish(queue_early_response, queue_notice=queue_notice) + + context_switch_response = self._handle_context_switch(message=routing_message, user_id=user_id) + if context_switch_response: + return await finish(context_switch_response, queue_notice=queue_notice) + + self._update_active_domain(message=routing_message, user_id=user_id) # 1) Se houver sugestao pendente de horario e o usuario confirmou ("pode/sim"), # agenda direto no horario sugerido. - confirmation_response = await self._try_confirm_pending_review(message=message, user_id=user_id) + confirmation_response = await self._try_confirm_pending_review(message=routing_message, user_id=user_id) if confirmation_response: - return confirmation_response + return await finish(confirmation_response, queue_notice=queue_notice) # 2) Fluxo de coleta incremental de dados da revisao (slot filling). # Evita pedir tudo de novo quando o usuario responde em partes. - review_response = await self._try_collect_and_schedule_review(message=message, user_id=user_id) + review_response = await self._try_collect_and_schedule_review(message=routing_message, user_id=user_id) if review_response: - return review_response + return await finish(review_response, queue_notice=queue_notice) tools = self.registry.get_tools() @@ -97,14 +126,17 @@ class OrquestradorService: exc=exc, user_id=user_id, ) - return self._http_exception_detail(exc) + return await finish(self._http_exception_detail(exc), queue_notice=queue_notice) if self._should_use_deterministic_response(tool_name): - return self._fallback_format_tool_result(tool_name, tool_result) + return await finish( + self._fallback_format_tool_result(tool_name, tool_result), + queue_notice=queue_notice, + ) final_response = await self.llm.generate_response( message=self._build_result_prompt( - user_message=message, + user_message=routing_message, user_id=user_id, tool_name=tool_name, tool_result=tool_result, @@ -113,14 +145,23 @@ class OrquestradorService: ) text = (final_response.get("response") or "").strip() if self._is_low_value_response(text): - return self._fallback_format_tool_result(tool_name, tool_result) + return await finish( + self._fallback_format_tool_result(tool_name, tool_result), + queue_notice=queue_notice, + ) - return text or self._fallback_format_tool_result(tool_name, tool_result) + return await finish( + text or self._fallback_format_tool_result(tool_name, tool_result), + queue_notice=queue_notice, + ) text = (llm_result.get("response") or "").strip() if self._is_low_value_response(text): - return "Entendi. Pode me dar mais detalhes para eu consultar corretamente?" - return text + return await finish( + "Entendi. Pode me dar mais detalhes para eu consultar corretamente?", + queue_notice=queue_notice, + ) + return await finish(text, queue_notice=queue_notice) def _reset_pending_review_states(self, user_id: int | None) -> None: if user_id is None: @@ -128,46 +169,369 @@ class OrquestradorService: self.PENDING_REVIEW_DRAFTS.pop(user_id, None) self.PENDING_REVIEW_CONFIRMATIONS.pop(user_id, None) - def _is_purchase_intent(self, text: str) -> bool: + def _upsert_user_context(self, user_id: int | None) -> None: + if user_id is None: + return + now = datetime.utcnow() + context = self.USER_CONTEXTS.get(user_id) + if context and context["expires_at"] >= now: + context["expires_at"] = now + timedelta(minutes=self.USER_CONTEXT_TTL_MINUTES) + return + self.USER_CONTEXTS[user_id] = { + "active_domain": "general", + "generic_memory": {}, + "order_queue": [], + "pending_switch": None, + "expires_at": now + timedelta(minutes=self.USER_CONTEXT_TTL_MINUTES), + } + + def _get_user_context(self, user_id: int | None) -> dict | None: + if user_id is None: + return None + context = self.USER_CONTEXTS.get(user_id) + if not context: + return None + if context["expires_at"] < datetime.utcnow(): + self.USER_CONTEXTS.pop(user_id, None) + return None + return context + + def _extract_generic_memory_fields(self, message: str) -> dict: + extracted: dict = {} + text = message or "" lowered = self._normalize_text(text) - keywords = ( + + plate_match = re.search( + r"\b([A-Za-z]{3}[0-9][A-Za-z0-9][0-9]{2}|[A-Za-z]{3}[0-9]{4})\b", + text, + ) + if plate_match: + extracted["placa"] = plate_match.group(1).upper() + + budget_match = re.search( + r"\bate\s*(?:r\$)?\s*(\d{1,3}(?:[.\s]\d{3})+|\d+)\s*(?:mil)?\b", + lowered, + ) + if budget_match: + digits = re.sub(r"[.\s]", "", budget_match.group(1)) + if digits.isdigit(): + value = int(digits) + if "mil" in lowered[budget_match.start(): budget_match.end()]: + value *= 1000 + extracted["orcamento_max"] = value + + vehicle_profile = [] + for marker in ("suv", "sedan", "hatch", "pickup"): + if marker in lowered: + vehicle_profile.append(marker) + if vehicle_profile: + extracted["perfil_veiculo"] = vehicle_profile + + return extracted + + def _capture_generic_memory(self, message: str, user_id: int | None) -> None: + context = self._get_user_context(user_id) + if not context: + return + fields = self._extract_generic_memory_fields(message) + if fields: + context["generic_memory"].update(fields) + + def _queue_order(self, user_id: int | None, domain: str, order_message: str) -> None: + context = self._get_user_context(user_id) + if not context: + return + if domain == "general": + return + queue = context.setdefault("order_queue", []) + queue.append( + { + "domain": domain, + "message": (order_message or "").strip(), + "created_at": datetime.utcnow().isoformat(), + } + ) + + def _pop_next_order(self, user_id: int | None) -> dict | None: + context = self._get_user_context(user_id) + if not context: + return None + queue = context.setdefault("order_queue", []) + if not queue: + return None + return queue.pop(0) + + def _extract_order_requests(self, message: str) -> list[dict]: + text = (message or "").strip() + if not text: + return [] + + lowered = self._normalize_text(text) + domain_patterns = { + "review": ( + r"\brevis[a-z]*\b", + r"\bagendar\b", + r"\bmarcar\b", + r"\bremarcar\b", + r"\bagendamento\b", + ), + "sales": ( + r"\bcomprar\b", + r"\bcompra\b", + r"\bcarro\b", + r"\bveiculo\b", + r"\bestoque\b", + r"\bpedido\b", + r"\bfinanci[a-z]*\b", + ), + } + + matches: list[tuple[int, str]] = [] + for domain, patterns in domain_patterns.items(): + for pattern in patterns: + for hit in re.finditer(pattern, lowered): + matches.append((hit.start(), domain)) + + if not matches: + inferred = self._infer_domain(text) + return [{"domain": inferred, "message": text}] if inferred != "general" else [] + + matches.sort(key=lambda item: item[0]) + transitions: list[tuple[int, str]] = [] + for position, domain in matches: + if not transitions or transitions[-1][1] != domain: + transitions.append((position, domain)) + + if len(transitions) <= 1: + inferred = self._infer_domain(text) + return [{"domain": inferred, "message": text}] if inferred != "general" else [] + + requests: list[dict] = [] + for idx, (start, domain) in enumerate(transitions): + segment_start = 0 if idx == 0 else start + segment_end = transitions[idx + 1][0] if idx + 1 < len(transitions) else len(text) + segment = text[segment_start:segment_end].strip(" ,;.") + if segment: + requests.append({"domain": domain, "message": segment}) + + return requests + + def _prepare_message_for_single_order(self, message: str, user_id: int | None) -> tuple[str, str | None, str | None]: + context = self._get_user_context(user_id) + if not context: + return message, None, None + + queue_notice = None + active_domain = context.get("active_domain", "general") + + extracted_orders = self._extract_order_requests(message) + if len(extracted_orders) <= 1: + inferred = self._infer_domain(message) + if ( + inferred != "general" + and inferred != active_domain + and self._has_open_flow(user_id=user_id, domain=active_domain) + ): + self._queue_order(user_id=user_id, domain=inferred, order_message=message) + return ( + message, + None, + self._render_open_flow_prompt(user_id=user_id, domain=active_domain), + ) + return message, None, None + + if self._has_open_flow(user_id=user_id, domain=active_domain): + for queued in extracted_orders: + if queued["domain"] != active_domain: + self._queue_order(user_id=user_id, domain=queued["domain"], order_message=queued["message"]) + return ( + message, + None, + self._render_open_flow_prompt(user_id=user_id, domain=active_domain), + ) + + first = extracted_orders[0] + for queued in extracted_orders[1:]: + self._queue_order(user_id=user_id, domain=queued["domain"], order_message=queued["message"]) + context["active_domain"] = first["domain"] + + queue_notice = None + return first["message"], queue_notice, None + + def _compose_order_aware_response(self, response: str, user_id: int | None, queue_notice: str | None = None) -> str: + lines = [] + if queue_notice: + lines.append(queue_notice) + lines.append(response) + return "\n".join(lines) + + def _render_open_flow_prompt(self, user_id: int | None, domain: str) -> str: + if domain == "review" and user_id is not None: + draft = self.PENDING_REVIEW_DRAFTS.get(user_id) + if draft: + missing = [field for field in self.REVIEW_REQUIRED_FIELDS if field not in draft.get("payload", {})] + if missing: + return self._render_missing_review_fields_prompt(missing) + + pending = self.PENDING_REVIEW_CONFIRMATIONS.get(user_id) + if pending: + return "Antes de mudar de assunto, me confirme se podemos concluir seu agendamento de revisao." + + return "Vamos concluir este assunto primeiro e ja sigo com o proximo em seguida." + + def _build_next_order_transition(self, domain: str) -> str: + if domain == "sales": + return "Agora, sobre a compra do veiculo:" + if domain == "review": + return "Agora, sobre o agendamento da revisao:" + return "Agora, sobre o proximo assunto:" + + async def _maybe_auto_advance_next_order(self, base_response: str, user_id: int | None) -> str: + context = self._get_user_context(user_id) + if not context: + return base_response + + if context.get("pending_switch"): + return base_response + + active_domain = context.get("active_domain", "general") + if self._has_open_flow(user_id=user_id, domain=active_domain): + return base_response + + next_order = self._pop_next_order(user_id=user_id) + if not next_order: + return base_response + + context["active_domain"] = next_order["domain"] + next_response = await self.handle_message(next_order["message"], user_id=user_id) + transition = self._build_next_order_transition(next_order["domain"]) + return f"{base_response}\n\n{transition}\n{next_response}" + + def _infer_domain(self, message: str) -> str: + lowered = self._normalize_text(message) + + review_keywords = ( + "revisao", + "agendamento", + "agendar", + "remarcar", + "cancelar agendamento", + "placa", + ) + sales_keywords = ( "comprar", "compra", - "carro", - "carros", "veiculo", - "veiculos", + "carro", "estoque", + "financi", + "pedido", ) - return any(k in lowered for k in keywords) - def _has_review_protocol(self, text: str) -> bool: - return re.search(r"\brev-\d{8}-[a-z0-9]+\b", (text or "").lower()) is not None + has_review = any(k in lowered for k in review_keywords) + has_sales = any(k in lowered for k in sales_keywords) + if has_review and not has_sales: + return "review" + if has_sales and not has_review: + return "sales" + return "general" - def _resolve_primary_intent_message(self, message: str, user_id: int | None) -> str: - # Em mensagens mistas ("cancele ... agora quero comprar"), prioriza compra - # quando nao ha protocolo explicito de revisao. - if not self._is_purchase_intent(message): - return message - if not self._is_review_management_intent(message): - return message - if self._has_review_protocol(message): - return message + def _is_context_switch_confirmation(self, message: str) -> bool: + return self._is_affirmative_message(message) or self._is_negative_message(message) - lowered = self._normalize_text(message) - buy_markers = ("agora quero comprar", "quero comprar", "comprar", "compra") - idx = -1 - for marker in buy_markers: - pos = lowered.rfind(marker) - if pos > idx: - idx = pos - - # Se identificar trecho de compra, usa apenas ele para rotear. - if idx >= 0: + def _has_open_flow(self, user_id: int | None, domain: str) -> bool: + if user_id is None: + return False + if domain == "review": + return bool( + self.PENDING_REVIEW_DRAFTS.get(user_id) + or self.PENDING_REVIEW_CONFIRMATIONS.get(user_id) + ) + return False + + def _apply_domain_switch(self, user_id: int | None, target_domain: str) -> None: + context = self._get_user_context(user_id) + if not context: + return + previous_domain = context.get("active_domain", "general") + if previous_domain == "review": self._reset_pending_review_states(user_id=user_id) - return (message or "")[idx:].strip() or message + context["active_domain"] = target_domain + context["pending_switch"] = None + + def _handle_context_switch(self, message: str, user_id: int | None) -> str | None: + context = self._get_user_context(user_id) + if not context: + return None + + pending_switch = context.get("pending_switch") + if pending_switch: + if pending_switch["expires_at"] < datetime.utcnow(): + context["pending_switch"] = None + elif self._is_context_switch_confirmation(message): + if self._is_affirmative_message(message): + target_domain = pending_switch["target_domain"] + self._apply_domain_switch(user_id=user_id, target_domain=target_domain) + return self._render_context_switched_message(target_domain=target_domain) + context["pending_switch"] = None + return "Perfeito, vamos continuar no fluxo atual." + + current_domain = context.get("active_domain", "general") + target_domain = self._infer_domain(message) + if target_domain == "general" or target_domain == current_domain: + return None + if not self._has_open_flow(user_id=user_id, domain=current_domain): + return None + + context["pending_switch"] = { + "source_domain": current_domain, + "target_domain": target_domain, + "expires_at": datetime.utcnow() + timedelta(minutes=15), + } + return self._render_context_switch_confirmation( + source_domain=current_domain, + target_domain=target_domain, + ) + + def _update_active_domain(self, message: str, user_id: int | None) -> None: + context = self._get_user_context(user_id) + if not context: + return + detected = self._infer_domain(message) + if detected != "general": + context["active_domain"] = detected + + def _domain_label(self, domain: str) -> str: + labels = { + "review": "agendamento de revisao", + "sales": "compra de veiculo", + "general": "atendimento geral", + } + return labels.get(domain, "atendimento") + + def _render_context_switch_confirmation(self, source_domain: str, target_domain: str) -> str: + return ( + f"Entendi que voce quer sair de {self._domain_label(source_domain)} " + f"e ir para {self._domain_label(target_domain)}. Tem certeza?" + ) + + def _render_context_switched_message(self, target_domain: str) -> str: + return f"Certo, contexto anterior encerrado. Vamos seguir com {self._domain_label(target_domain)}." + + def _build_context_summary(self, user_id: int | None) -> str: + context = self._get_user_context(user_id) + if not context: + return "Contexto de conversa: sem contexto ativo." - return message + domain = context.get("active_domain", "general") + memory = context.get("generic_memory", {}) + order_queue = context.get("order_queue", []) + summary = [f"Contexto de conversa ativo: {self._domain_label(domain)}."] + if memory: + summary.append(f"Memoria generica temporaria: {memory}.") + if order_queue: + summary.append(f"Fila de pedidos pendentes: {len(order_queue)}.") + return " ".join(summary) def _should_use_deterministic_response(self, tool_name: str) -> bool: return tool_name in self.DETERMINISTIC_RESPONSE_TOOLS @@ -390,7 +754,7 @@ class OrquestradorService: def _is_affirmative_message(self, text: str) -> bool: normalized = self._normalize_text(text).strip() normalized = re.sub(r"[.!?,;:]+$", "", normalized) - return normalized in {"sim", "pode", "ok", "confirmo", "aceito", "fechado", "pode sim"} + return normalized in {"sim", "pode", "ok", "confirmo", "aceito", "fechado", "pode sim", "tenho", "tenho sim"} def _is_negative_message(self, text: str) -> bool: normalized = self._normalize_text(text).strip() @@ -529,21 +893,25 @@ class OrquestradorService: def _build_router_prompt(self, user_message: str, user_id: int | None) -> str: user_context = f"Contexto de usuario autenticado: user_id={user_id}.\n" if user_id else "" + conversation_context = self._build_context_summary(user_id=user_id) return ( "Voce e um assistente de concessionaria. " "Sempre que a solicitacao depender de dados operacionais (estoque, validacao de cliente, " "avaliacao de troca, agendamento de revisao, realizacao ou cancelamento de pedido), use a tool correta. " "Se faltar parametro obrigatorio para a tool, responda em texto pedindo apenas o que falta.\n\n" f"{user_context}" + f"{conversation_context}\n" f"Mensagem do usuario: {user_message}" ) def _build_force_tool_prompt(self, user_message: str, user_id: int | None) -> str: user_context = f"Contexto de usuario autenticado: user_id={user_id}.\n" if user_id else "" + conversation_context = self._build_context_summary(user_id=user_id) return ( "Reavalie a mensagem e priorize chamar tool se houver intencao operacional. " "Use texto apenas quando faltar dado obrigatorio.\n\n" f"{user_context}" + f"{conversation_context}\n" f"Mensagem do usuario: {user_message}" ) @@ -555,11 +923,13 @@ class OrquestradorService: tool_result, ) -> str: user_context = f"Contexto de usuario autenticado: user_id={user_id}.\n" if user_id else "" + conversation_context = self._build_context_summary(user_id=user_id) return ( "Responda ao usuario de forma objetiva usando o resultado da tool abaixo. " "Nao invente dados. Se a lista vier vazia, diga explicitamente que nao encontrou resultados. " "Retorne texto puro sem markdown, sem asteriscos, sem emojis e com linhas curtas.\n\n" f"{user_context}" + f"{conversation_context}\n" f"Pergunta original: {user_message}\n" f"Tool executada: {tool_name}\n" f"Resultado da tool: {tool_result}"