♻️ refactor(orquestrador): consolidar contexto multiassunto com continuidade automática

- evita interromper fluxo ativo ao receber mensagem com dois assuntos

- enfileira assuntos secundarios e avanca automaticamente apos concluir o atual

- remove metodos legados sem uso no orquestrador (roteamento antigo por regex)

- remove metodo sem uso em UserRepository para reduzir codigo morto
main
parent af513f5583
commit 29faec5464

@ -8,10 +8,6 @@ class UserRepository:
"""Inicializa o repositorio de usuarios com a sessao ativa."""
self.db = db
def get_by_id(self, user_id: int):
"""Busca usuario por ID interno."""
return self.db.query(User).filter(User.id == user_id).first()
def get_by_channel_external_id(self, channel: str, external_id: str):
"""Busca usuario por canal e identificador externo."""
return (

@ -10,6 +10,9 @@ from app.services.tool_registry import ToolRegistry
class OrquestradorService:
USER_CONTEXTS: dict[int, dict] = {}
USER_CONTEXT_TTL_MINUTES = 60
# Memoria temporaria de confirmacao quando a API sugere novo horario (conflito 409).
PENDING_REVIEW_CONFIRMATIONS: dict[int, dict] = {}
PENDING_REVIEW_TTL_MINUTES = 30 # Pode ser alterado por uma variavel de configuracao caso o sistema cresca
@ -54,18 +57,44 @@ class OrquestradorService:
async def handle_message(self, message: str, user_id: int | None = None) -> str:
"""Processa mensagem, executa tool quando necessario e retorna resposta final."""
routing_message = self._resolve_primary_intent_message(message=message, user_id=user_id)
async def finish(response: str, queue_notice: str | None = None) -> str:
composed = self._compose_order_aware_response(
response=response,
user_id=user_id,
queue_notice=queue_notice,
)
return await self._maybe_auto_advance_next_order(
base_response=composed,
user_id=user_id,
)
self._upsert_user_context(user_id=user_id)
self._capture_generic_memory(message=message, user_id=user_id)
(
routing_message,
queue_notice,
queue_early_response,
) = self._prepare_message_for_single_order(message=message, user_id=user_id)
if queue_early_response:
return await finish(queue_early_response, queue_notice=queue_notice)
context_switch_response = self._handle_context_switch(message=routing_message, user_id=user_id)
if context_switch_response:
return await finish(context_switch_response, queue_notice=queue_notice)
self._update_active_domain(message=routing_message, user_id=user_id)
# 1) Se houver sugestao pendente de horario e o usuario confirmou ("pode/sim"),
# agenda direto no horario sugerido.
confirmation_response = await self._try_confirm_pending_review(message=message, user_id=user_id)
confirmation_response = await self._try_confirm_pending_review(message=routing_message, user_id=user_id)
if confirmation_response:
return confirmation_response
return await finish(confirmation_response, queue_notice=queue_notice)
# 2) Fluxo de coleta incremental de dados da revisao (slot filling).
# Evita pedir tudo de novo quando o usuario responde em partes.
review_response = await self._try_collect_and_schedule_review(message=message, user_id=user_id)
review_response = await self._try_collect_and_schedule_review(message=routing_message, user_id=user_id)
if review_response:
return review_response
return await finish(review_response, queue_notice=queue_notice)
tools = self.registry.get_tools()
@ -97,14 +126,17 @@ class OrquestradorService:
exc=exc,
user_id=user_id,
)
return self._http_exception_detail(exc)
return await finish(self._http_exception_detail(exc), queue_notice=queue_notice)
if self._should_use_deterministic_response(tool_name):
return self._fallback_format_tool_result(tool_name, tool_result)
return await finish(
self._fallback_format_tool_result(tool_name, tool_result),
queue_notice=queue_notice,
)
final_response = await self.llm.generate_response(
message=self._build_result_prompt(
user_message=message,
user_message=routing_message,
user_id=user_id,
tool_name=tool_name,
tool_result=tool_result,
@ -113,14 +145,23 @@ class OrquestradorService:
)
text = (final_response.get("response") or "").strip()
if self._is_low_value_response(text):
return self._fallback_format_tool_result(tool_name, tool_result)
return await finish(
self._fallback_format_tool_result(tool_name, tool_result),
queue_notice=queue_notice,
)
return text or self._fallback_format_tool_result(tool_name, tool_result)
return await finish(
text or self._fallback_format_tool_result(tool_name, tool_result),
queue_notice=queue_notice,
)
text = (llm_result.get("response") or "").strip()
if self._is_low_value_response(text):
return "Entendi. Pode me dar mais detalhes para eu consultar corretamente?"
return text
return await finish(
"Entendi. Pode me dar mais detalhes para eu consultar corretamente?",
queue_notice=queue_notice,
)
return await finish(text, queue_notice=queue_notice)
def _reset_pending_review_states(self, user_id: int | None) -> None:
if user_id is None:
@ -128,46 +169,369 @@ class OrquestradorService:
self.PENDING_REVIEW_DRAFTS.pop(user_id, None)
self.PENDING_REVIEW_CONFIRMATIONS.pop(user_id, None)
def _is_purchase_intent(self, text: str) -> bool:
def _upsert_user_context(self, user_id: int | None) -> None:
if user_id is None:
return
now = datetime.utcnow()
context = self.USER_CONTEXTS.get(user_id)
if context and context["expires_at"] >= now:
context["expires_at"] = now + timedelta(minutes=self.USER_CONTEXT_TTL_MINUTES)
return
self.USER_CONTEXTS[user_id] = {
"active_domain": "general",
"generic_memory": {},
"order_queue": [],
"pending_switch": None,
"expires_at": now + timedelta(minutes=self.USER_CONTEXT_TTL_MINUTES),
}
def _get_user_context(self, user_id: int | None) -> dict | None:
if user_id is None:
return None
context = self.USER_CONTEXTS.get(user_id)
if not context:
return None
if context["expires_at"] < datetime.utcnow():
self.USER_CONTEXTS.pop(user_id, None)
return None
return context
def _extract_generic_memory_fields(self, message: str) -> dict:
extracted: dict = {}
text = message or ""
lowered = self._normalize_text(text)
keywords = (
plate_match = re.search(
r"\b([A-Za-z]{3}[0-9][A-Za-z0-9][0-9]{2}|[A-Za-z]{3}[0-9]{4})\b",
text,
)
if plate_match:
extracted["placa"] = plate_match.group(1).upper()
budget_match = re.search(
r"\bate\s*(?:r\$)?\s*(\d{1,3}(?:[.\s]\d{3})+|\d+)\s*(?:mil)?\b",
lowered,
)
if budget_match:
digits = re.sub(r"[.\s]", "", budget_match.group(1))
if digits.isdigit():
value = int(digits)
if "mil" in lowered[budget_match.start(): budget_match.end()]:
value *= 1000
extracted["orcamento_max"] = value
vehicle_profile = []
for marker in ("suv", "sedan", "hatch", "pickup"):
if marker in lowered:
vehicle_profile.append(marker)
if vehicle_profile:
extracted["perfil_veiculo"] = vehicle_profile
return extracted
def _capture_generic_memory(self, message: str, user_id: int | None) -> None:
context = self._get_user_context(user_id)
if not context:
return
fields = self._extract_generic_memory_fields(message)
if fields:
context["generic_memory"].update(fields)
def _queue_order(self, user_id: int | None, domain: str, order_message: str) -> None:
context = self._get_user_context(user_id)
if not context:
return
if domain == "general":
return
queue = context.setdefault("order_queue", [])
queue.append(
{
"domain": domain,
"message": (order_message or "").strip(),
"created_at": datetime.utcnow().isoformat(),
}
)
def _pop_next_order(self, user_id: int | None) -> dict | None:
context = self._get_user_context(user_id)
if not context:
return None
queue = context.setdefault("order_queue", [])
if not queue:
return None
return queue.pop(0)
def _extract_order_requests(self, message: str) -> list[dict]:
text = (message or "").strip()
if not text:
return []
lowered = self._normalize_text(text)
domain_patterns = {
"review": (
r"\brevis[a-z]*\b",
r"\bagendar\b",
r"\bmarcar\b",
r"\bremarcar\b",
r"\bagendamento\b",
),
"sales": (
r"\bcomprar\b",
r"\bcompra\b",
r"\bcarro\b",
r"\bveiculo\b",
r"\bestoque\b",
r"\bpedido\b",
r"\bfinanci[a-z]*\b",
),
}
matches: list[tuple[int, str]] = []
for domain, patterns in domain_patterns.items():
for pattern in patterns:
for hit in re.finditer(pattern, lowered):
matches.append((hit.start(), domain))
if not matches:
inferred = self._infer_domain(text)
return [{"domain": inferred, "message": text}] if inferred != "general" else []
matches.sort(key=lambda item: item[0])
transitions: list[tuple[int, str]] = []
for position, domain in matches:
if not transitions or transitions[-1][1] != domain:
transitions.append((position, domain))
if len(transitions) <= 1:
inferred = self._infer_domain(text)
return [{"domain": inferred, "message": text}] if inferred != "general" else []
requests: list[dict] = []
for idx, (start, domain) in enumerate(transitions):
segment_start = 0 if idx == 0 else start
segment_end = transitions[idx + 1][0] if idx + 1 < len(transitions) else len(text)
segment = text[segment_start:segment_end].strip(" ,;.")
if segment:
requests.append({"domain": domain, "message": segment})
return requests
def _prepare_message_for_single_order(self, message: str, user_id: int | None) -> tuple[str, str | None, str | None]:
context = self._get_user_context(user_id)
if not context:
return message, None, None
queue_notice = None
active_domain = context.get("active_domain", "general")
extracted_orders = self._extract_order_requests(message)
if len(extracted_orders) <= 1:
inferred = self._infer_domain(message)
if (
inferred != "general"
and inferred != active_domain
and self._has_open_flow(user_id=user_id, domain=active_domain)
):
self._queue_order(user_id=user_id, domain=inferred, order_message=message)
return (
message,
None,
self._render_open_flow_prompt(user_id=user_id, domain=active_domain),
)
return message, None, None
if self._has_open_flow(user_id=user_id, domain=active_domain):
for queued in extracted_orders:
if queued["domain"] != active_domain:
self._queue_order(user_id=user_id, domain=queued["domain"], order_message=queued["message"])
return (
message,
None,
self._render_open_flow_prompt(user_id=user_id, domain=active_domain),
)
first = extracted_orders[0]
for queued in extracted_orders[1:]:
self._queue_order(user_id=user_id, domain=queued["domain"], order_message=queued["message"])
context["active_domain"] = first["domain"]
queue_notice = None
return first["message"], queue_notice, None
def _compose_order_aware_response(self, response: str, user_id: int | None, queue_notice: str | None = None) -> str:
lines = []
if queue_notice:
lines.append(queue_notice)
lines.append(response)
return "\n".join(lines)
def _render_open_flow_prompt(self, user_id: int | None, domain: str) -> str:
if domain == "review" and user_id is not None:
draft = self.PENDING_REVIEW_DRAFTS.get(user_id)
if draft:
missing = [field for field in self.REVIEW_REQUIRED_FIELDS if field not in draft.get("payload", {})]
if missing:
return self._render_missing_review_fields_prompt(missing)
pending = self.PENDING_REVIEW_CONFIRMATIONS.get(user_id)
if pending:
return "Antes de mudar de assunto, me confirme se podemos concluir seu agendamento de revisao."
return "Vamos concluir este assunto primeiro e ja sigo com o proximo em seguida."
def _build_next_order_transition(self, domain: str) -> str:
if domain == "sales":
return "Agora, sobre a compra do veiculo:"
if domain == "review":
return "Agora, sobre o agendamento da revisao:"
return "Agora, sobre o proximo assunto:"
async def _maybe_auto_advance_next_order(self, base_response: str, user_id: int | None) -> str:
context = self._get_user_context(user_id)
if not context:
return base_response
if context.get("pending_switch"):
return base_response
active_domain = context.get("active_domain", "general")
if self._has_open_flow(user_id=user_id, domain=active_domain):
return base_response
next_order = self._pop_next_order(user_id=user_id)
if not next_order:
return base_response
context["active_domain"] = next_order["domain"]
next_response = await self.handle_message(next_order["message"], user_id=user_id)
transition = self._build_next_order_transition(next_order["domain"])
return f"{base_response}\n\n{transition}\n{next_response}"
def _infer_domain(self, message: str) -> str:
lowered = self._normalize_text(message)
review_keywords = (
"revisao",
"agendamento",
"agendar",
"remarcar",
"cancelar agendamento",
"placa",
)
sales_keywords = (
"comprar",
"compra",
"carro",
"carros",
"veiculo",
"veiculos",
"carro",
"estoque",
"financi",
"pedido",
)
return any(k in lowered for k in keywords)
def _has_review_protocol(self, text: str) -> bool:
return re.search(r"\brev-\d{8}-[a-z0-9]+\b", (text or "").lower()) is not None
has_review = any(k in lowered for k in review_keywords)
has_sales = any(k in lowered for k in sales_keywords)
if has_review and not has_sales:
return "review"
if has_sales and not has_review:
return "sales"
return "general"
def _resolve_primary_intent_message(self, message: str, user_id: int | None) -> str:
# Em mensagens mistas ("cancele ... agora quero comprar"), prioriza compra
# quando nao ha protocolo explicito de revisao.
if not self._is_purchase_intent(message):
return message
if not self._is_review_management_intent(message):
return message
if self._has_review_protocol(message):
return message
def _is_context_switch_confirmation(self, message: str) -> bool:
return self._is_affirmative_message(message) or self._is_negative_message(message)
lowered = self._normalize_text(message)
buy_markers = ("agora quero comprar", "quero comprar", "comprar", "compra")
idx = -1
for marker in buy_markers:
pos = lowered.rfind(marker)
if pos > idx:
idx = pos
# Se identificar trecho de compra, usa apenas ele para rotear.
if idx >= 0:
def _has_open_flow(self, user_id: int | None, domain: str) -> bool:
if user_id is None:
return False
if domain == "review":
return bool(
self.PENDING_REVIEW_DRAFTS.get(user_id)
or self.PENDING_REVIEW_CONFIRMATIONS.get(user_id)
)
return False
def _apply_domain_switch(self, user_id: int | None, target_domain: str) -> None:
context = self._get_user_context(user_id)
if not context:
return
previous_domain = context.get("active_domain", "general")
if previous_domain == "review":
self._reset_pending_review_states(user_id=user_id)
return (message or "")[idx:].strip() or message
context["active_domain"] = target_domain
context["pending_switch"] = None
def _handle_context_switch(self, message: str, user_id: int | None) -> str | None:
context = self._get_user_context(user_id)
if not context:
return None
pending_switch = context.get("pending_switch")
if pending_switch:
if pending_switch["expires_at"] < datetime.utcnow():
context["pending_switch"] = None
elif self._is_context_switch_confirmation(message):
if self._is_affirmative_message(message):
target_domain = pending_switch["target_domain"]
self._apply_domain_switch(user_id=user_id, target_domain=target_domain)
return self._render_context_switched_message(target_domain=target_domain)
context["pending_switch"] = None
return "Perfeito, vamos continuar no fluxo atual."
current_domain = context.get("active_domain", "general")
target_domain = self._infer_domain(message)
if target_domain == "general" or target_domain == current_domain:
return None
if not self._has_open_flow(user_id=user_id, domain=current_domain):
return None
context["pending_switch"] = {
"source_domain": current_domain,
"target_domain": target_domain,
"expires_at": datetime.utcnow() + timedelta(minutes=15),
}
return self._render_context_switch_confirmation(
source_domain=current_domain,
target_domain=target_domain,
)
def _update_active_domain(self, message: str, user_id: int | None) -> None:
context = self._get_user_context(user_id)
if not context:
return
detected = self._infer_domain(message)
if detected != "general":
context["active_domain"] = detected
def _domain_label(self, domain: str) -> str:
labels = {
"review": "agendamento de revisao",
"sales": "compra de veiculo",
"general": "atendimento geral",
}
return labels.get(domain, "atendimento")
def _render_context_switch_confirmation(self, source_domain: str, target_domain: str) -> str:
return (
f"Entendi que voce quer sair de {self._domain_label(source_domain)} "
f"e ir para {self._domain_label(target_domain)}. Tem certeza?"
)
def _render_context_switched_message(self, target_domain: str) -> str:
return f"Certo, contexto anterior encerrado. Vamos seguir com {self._domain_label(target_domain)}."
def _build_context_summary(self, user_id: int | None) -> str:
context = self._get_user_context(user_id)
if not context:
return "Contexto de conversa: sem contexto ativo."
return message
domain = context.get("active_domain", "general")
memory = context.get("generic_memory", {})
order_queue = context.get("order_queue", [])
summary = [f"Contexto de conversa ativo: {self._domain_label(domain)}."]
if memory:
summary.append(f"Memoria generica temporaria: {memory}.")
if order_queue:
summary.append(f"Fila de pedidos pendentes: {len(order_queue)}.")
return " ".join(summary)
def _should_use_deterministic_response(self, tool_name: str) -> bool:
return tool_name in self.DETERMINISTIC_RESPONSE_TOOLS
@ -390,7 +754,7 @@ class OrquestradorService:
def _is_affirmative_message(self, text: str) -> bool:
normalized = self._normalize_text(text).strip()
normalized = re.sub(r"[.!?,;:]+$", "", normalized)
return normalized in {"sim", "pode", "ok", "confirmo", "aceito", "fechado", "pode sim"}
return normalized in {"sim", "pode", "ok", "confirmo", "aceito", "fechado", "pode sim", "tenho", "tenho sim"}
def _is_negative_message(self, text: str) -> bool:
normalized = self._normalize_text(text).strip()
@ -529,21 +893,25 @@ class OrquestradorService:
def _build_router_prompt(self, user_message: str, user_id: int | None) -> str:
user_context = f"Contexto de usuario autenticado: user_id={user_id}.\n" if user_id else ""
conversation_context = self._build_context_summary(user_id=user_id)
return (
"Voce e um assistente de concessionaria. "
"Sempre que a solicitacao depender de dados operacionais (estoque, validacao de cliente, "
"avaliacao de troca, agendamento de revisao, realizacao ou cancelamento de pedido), use a tool correta. "
"Se faltar parametro obrigatorio para a tool, responda em texto pedindo apenas o que falta.\n\n"
f"{user_context}"
f"{conversation_context}\n"
f"Mensagem do usuario: {user_message}"
)
def _build_force_tool_prompt(self, user_message: str, user_id: int | None) -> str:
user_context = f"Contexto de usuario autenticado: user_id={user_id}.\n" if user_id else ""
conversation_context = self._build_context_summary(user_id=user_id)
return (
"Reavalie a mensagem e priorize chamar tool se houver intencao operacional. "
"Use texto apenas quando faltar dado obrigatorio.\n\n"
f"{user_context}"
f"{conversation_context}\n"
f"Mensagem do usuario: {user_message}"
)
@ -555,11 +923,13 @@ class OrquestradorService:
tool_result,
) -> str:
user_context = f"Contexto de usuario autenticado: user_id={user_id}.\n" if user_id else ""
conversation_context = self._build_context_summary(user_id=user_id)
return (
"Responda ao usuario de forma objetiva usando o resultado da tool abaixo. "
"Nao invente dados. Se a lista vier vazia, diga explicitamente que nao encontrou resultados. "
"Retorne texto puro sem markdown, sem asteriscos, sem emojis e com linhas curtas.\n\n"
f"{user_context}"
f"{conversation_context}\n"
f"Pergunta original: {user_message}\n"
f"Tool executada: {tool_name}\n"
f"Resultado da tool: {tool_result}"

Loading…
Cancel
Save