🧠 feat(orquestrador): ampliar controle conversacional e hidratar cliente mock por CPF

- adiciona tools de orquestracao para limpar contexto, descartar fila, cancelar fluxo e continuar pedidos pendentes
- prioriza a decisao do LLM para comandos globais antes dos slot fillings ativos
- melhora selecao entre pedidos concorrentes e resposta deterministica das tools de orquestracao
- estrutura conflitos de horario de revisao para facilitar confirmacao posterior
- reaproveita CPF da memoria/perfil do usuario no fluxo de compra
- cria servico mock para hidratar customer e vincular users.cpf ao informar um CPF valido
main
parent e955d64306
commit 57dc824242

@ -11,10 +11,12 @@ from app.api.schemas import (
CancelarPedidoRequest,
ConsultarEstoqueRequest,
EditarDataRevisaoRequest,
HidratarClienteMockRequest,
ListarAgendamentosRevisaoRequest,
RealizarPedidoRequest,
ValidarClienteVendaRequest,
)
from app.services.user.mock_customer_service import hydrate_mock_customer_from_cpf
from app.services.tools.handlers import (
agendar_revisao,
avaliar_veiculo_troca,
@ -60,6 +62,22 @@ async def validar_cliente_venda_endpoint(
raise HTTPException(status_code=503, detail=db_error_detail(exc))
@router.post("/hidratar-cliente-mock")
async def hidratar_cliente_mock_endpoint(
body: HidratarClienteMockRequest,
) -> Dict[str, Any]:
"""Cria ou vincula um cliente mock a partir do CPF informado, simulando consulta externa."""
try:
return await hydrate_mock_customer_from_cpf(
cpf=body.cpf,
user_id=body.user_id,
)
except ValueError as exc:
raise HTTPException(status_code=400, detail=str(exc))
except SQLAlchemyError as exc:
raise HTTPException(status_code=503, detail=db_error_detail(exc))
@router.post("/avaliar-veiculo-troca")
async def avaliar_veiculo_troca_endpoint(
body: AvaliarVeiculoTrocaRequest,

@ -39,6 +39,11 @@ class ValidarClienteVendaRequest(BaseModel):
valor_veiculo: float
class HidratarClienteMockRequest(BaseModel):
cpf: str
user_id: Optional[int] = None
class AvaliarVeiculoTrocaRequest(BaseModel):
modelo: str
ano: int

@ -258,6 +258,71 @@ def get_tools_definitions():
"required": ["numero_pedido", "motivo"],
},
},
{
"name": "limpar_contexto_conversa",
"description": (
"Use esta ferramenta quando o usuario pedir para recomecar o atendimento, "
"esquecer o contexto atual, limpar memoria volatil ou iniciar do zero. "
"Ela limpa fila de pedidos, fluxos pendentes e contexto ativo do usuario."
),
"parameters": {
"type": "object",
"properties": {
"motivo": {
"type": "string",
"description": "Resumo curto do motivo da limpeza de contexto. Opcional.",
},
},
"required": [],
},
},
{
"name": "continuar_proximo_pedido",
"description": (
"Use esta ferramenta quando o usuario pedir para continuar, seguir ou abrir "
"o proximo pedido que ficou na fila do atendimento."
),
"parameters": {
"type": "object",
"properties": {},
"required": [],
},
},
{
"name": "descartar_pedidos_pendentes",
"description": (
"Use esta ferramenta quando o usuario pedir para cancelar, ignorar ou "
"descartar apenas os pedidos pendentes da fila, sem apagar necessariamente "
"todo o contexto da conversa."
),
"parameters": {
"type": "object",
"properties": {
"motivo": {
"type": "string",
"description": "Resumo curto do motivo para descartar a fila pendente. Opcional.",
},
},
"required": [],
},
},
{
"name": "cancelar_fluxo_atual",
"description": (
"Use esta ferramenta quando o usuario pedir para cancelar apenas o fluxo "
"atual em andamento, sem limpar toda a memoria da conversa."
),
"parameters": {
"type": "object",
"properties": {
"motivo": {
"type": "string",
"description": "Resumo curto do motivo do cancelamento do fluxo atual. Opcional.",
},
},
"required": [],
},
},
]

@ -3,12 +3,15 @@ from datetime import datetime, timedelta
from fastapi import HTTPException
from app.db.mock_database import SessionMockLocal
from app.db.mock_models import User
from app.services.orchestration.orchestrator_config import (
CANCEL_ORDER_REQUIRED_FIELDS,
ORDER_REQUIRED_FIELDS,
PENDING_CANCEL_ORDER_DRAFT_TTL_MINUTES,
PENDING_ORDER_DRAFT_TTL_MINUTES,
)
from app.services.user.mock_customer_service import hydrate_mock_customer_from_cpf
class OrderFlowMixin:
@ -44,6 +47,30 @@ class OrderFlowMixin:
if isinstance(budget, (int, float)) and budget > 0:
payload["valor_veiculo"] = float(budget)
def _try_prefill_order_cpf_from_memory(self, user_id: int | None, payload: dict) -> None:
if user_id is None or payload.get("cpf"):
return
context = self._get_user_context(user_id)
if not context:
return
memory = context.get("generic_memory", {})
cpf = memory.get("cpf")
if isinstance(cpf, str) and self._is_valid_cpf(cpf):
payload["cpf"] = cpf
def _try_prefill_order_cpf_from_user_profile(self, user_id: int | None, payload: dict) -> None:
if user_id is None or payload.get("cpf"):
return
db = SessionMockLocal()
try:
user = db.query(User).filter(User.id == user_id).first()
if user and isinstance(user.cpf, str) and self._is_valid_cpf(user.cpf):
payload["cpf"] = user.cpf
finally:
db.close()
def _render_missing_order_fields_prompt(self, missing_fields: list[str]) -> str:
labels = {
"cpf": "o CPF do cliente",
@ -101,6 +128,8 @@ class OrderFlowMixin:
}
draft["payload"].update(extracted)
self._try_prefill_order_cpf_from_memory(user_id=user_id, payload=draft["payload"])
self._try_prefill_order_cpf_from_user_profile(user_id=user_id, payload=draft["payload"])
self._try_prefill_order_value_from_memory(user_id=user_id, payload=draft["payload"])
cpf_value = draft["payload"].get("cpf")
@ -108,6 +137,16 @@ class OrderFlowMixin:
draft["payload"].pop("cpf", None)
self.state.set_entry("pending_order_drafts", user_id, draft)
return "Para seguir com o pedido, preciso de um CPF valido com 11 digitos."
if cpf_value:
try:
await hydrate_mock_customer_from_cpf(
cpf=str(cpf_value),
user_id=user_id,
)
except ValueError:
draft["payload"].pop("cpf", None)
self.state.set_entry("pending_order_drafts", user_id, draft)
return "Para seguir com o pedido, preciso de um CPF valido com 11 digitos."
valor = draft["payload"].get("valor_veiculo")
if valor is not None:

@ -25,6 +25,7 @@ class ConversationStateStore:
"generic_memory": {},
"shared_memory": {},
"order_queue": [],
"pending_order_selection": None,
"pending_switch": None,
"expires_at": now + timedelta(minutes=ttl_minutes),
}

@ -1,4 +1,5 @@
USER_CONTEXT_TTL_MINUTES = 60
PENDING_ORDER_SELECTION_TTL_MINUTES = 15
PENDING_REVIEW_TTL_MINUTES = 30
PENDING_REVIEW_DRAFT_TTL_MINUTES = 30
@ -46,4 +47,15 @@ DETERMINISTIC_RESPONSE_TOOLS = {
"editar_data_revisao",
"cancelar_pedido",
"realizar_pedido",
"limpar_contexto_conversa",
"continuar_proximo_pedido",
"descartar_pedidos_pendentes",
"cancelar_fluxo_atual",
}
ORCHESTRATION_CONTROL_TOOLS = {
"limpar_contexto_conversa",
"continuar_proximo_pedido",
"descartar_pedidos_pendentes",
"cancelar_fluxo_atual",
}

@ -10,6 +10,8 @@ from sqlalchemy.orm import Session
from app.services.orchestration.orchestrator_config import (
DETERMINISTIC_RESPONSE_TOOLS,
LOW_VALUE_RESPONSES,
ORCHESTRATION_CONTROL_TOOLS,
PENDING_ORDER_SELECTION_TTL_MINUTES,
PENDING_REVIEW_TTL_MINUTES,
USER_CONTEXT_TTL_MINUTES,
)
@ -34,7 +36,15 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin):
def __init__(self, db: Session):
"""Inicializa servicos de LLM e registro de tools para a sessao atual."""
self.llm = LLMService()
self.registry = ToolRegistry(db)
self.registry = ToolRegistry(db, extra_handlers=self._build_orchestration_tool_handlers())
def _build_orchestration_tool_handlers(self) -> dict:
return {
"limpar_contexto_conversa": self._tool_limpar_contexto_conversa,
"continuar_proximo_pedido": self._tool_continuar_proximo_pedido,
"descartar_pedidos_pendentes": self._tool_descartar_pedidos_pendentes,
"cancelar_fluxo_atual": self._tool_cancelar_fluxo_atual,
}
async def handle_message(self, message: str, user_id: int | None = None) -> str:
"""Processa mensagem, executa tool quando necessario e retorna resposta final."""
@ -50,6 +60,12 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin):
)
self._upsert_user_context(user_id=user_id)
pending_order_selection = await self._try_resolve_pending_order_selection(
message=message,
user_id=user_id,
)
if pending_order_selection:
return pending_order_selection
queued_followup = await self._try_continue_queued_order(message=message, user_id=user_id)
if queued_followup:
return queued_followup
@ -105,6 +121,16 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin):
self._update_active_domain(user_id=user_id, domain_hint=domain_hint)
orchestration_override = await self._try_execute_orchestration_control_tool(
message=routing_message,
user_id=user_id,
extracted_entities=extracted_entities,
queue_notice=queue_notice,
finish=finish,
)
if orchestration_override:
return orchestration_override
review_management_response = await self._try_handle_review_management(
message=routing_message,
user_id=user_id,
@ -225,6 +251,72 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin):
)
return await finish(text, queue_notice=queue_notice)
async def _try_execute_orchestration_control_tool(
self,
message: str,
user_id: int | None,
extracted_entities: dict,
queue_notice: str | None,
finish,
) -> str | None:
tools = self.registry.get_tools()
llm_result = await self.llm.generate_response(
message=self._build_router_prompt(user_message=message, user_id=user_id),
tools=tools,
)
tool_call = llm_result.get("tool_call") or {}
tool_name = tool_call.get("name")
if tool_name in ORCHESTRATION_CONTROL_TOOLS:
try:
tool_result = await self.registry.execute(
tool_name,
tool_call.get("arguments") or {},
user_id=user_id,
)
except HTTPException as exc:
return await finish(self._http_exception_detail(exc), queue_notice=queue_notice)
return await finish(
self._fallback_format_tool_result(tool_name, tool_result),
queue_notice=queue_notice,
)
first_pass_text = (llm_result.get("response") or "").strip()
should_force_tool = (
not tool_name
and (
self._has_open_flow(user_id=user_id, domain="review")
or self._has_open_flow(user_id=user_id, domain="sales")
or bool((self._get_user_context(user_id) or {}).get("pending_switch"))
or bool((self._get_user_context(user_id) or {}).get("order_queue"))
)
and self._is_low_value_response(first_pass_text)
)
if not should_force_tool:
return None
llm_result = await self.llm.generate_response(
message=self._build_force_tool_prompt(user_message=message, user_id=user_id),
tools=tools,
)
forced_tool_call = llm_result.get("tool_call") or {}
forced_tool_name = forced_tool_call.get("name")
if forced_tool_name not in ORCHESTRATION_CONTROL_TOOLS:
return None
try:
tool_result = await self.registry.execute(
forced_tool_name,
forced_tool_call.get("arguments") or {},
user_id=user_id,
)
except HTTPException as exc:
return await finish(self._http_exception_detail(exc), queue_notice=queue_notice)
return await finish(
self._fallback_format_tool_result(forced_tool_name, tool_result),
queue_notice=queue_notice,
)
def _reset_pending_review_states(self, user_id: int | None) -> None:
if user_id is None:
return
@ -239,6 +331,127 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin):
self.state.pop_entry("pending_order_drafts", user_id)
self.state.pop_entry("pending_cancel_order_drafts", user_id)
def _clear_user_conversation_state(self, user_id: int | None) -> None:
context = self._get_user_context(user_id)
if not context:
return
self._reset_pending_review_states(user_id=user_id)
self._reset_pending_order_states(user_id=user_id)
context["active_domain"] = "general"
context["generic_memory"] = {}
context["shared_memory"] = {}
context["order_queue"] = []
context["pending_order_selection"] = None
context["pending_switch"] = None
def _clear_pending_order_navigation(self, user_id: int | None) -> int:
context = self._get_user_context(user_id)
if not context:
return 0
dropped = len(context.get("order_queue", []))
if context.get("pending_switch"):
dropped += 1
if context.get("pending_order_selection"):
pending_orders = context["pending_order_selection"].get("orders") or []
dropped += len(pending_orders)
context["order_queue"] = []
context["pending_switch"] = None
context["pending_order_selection"] = None
return dropped
def _cancel_active_flow(self, user_id: int | None) -> str:
context = self._get_user_context(user_id)
if not context:
return "Nao havia contexto ativo para cancelar."
active_domain = context.get("active_domain", "general")
had_flow = self._has_open_flow(user_id=user_id, domain=active_domain)
if active_domain == "review":
self._reset_pending_review_states(user_id=user_id)
elif active_domain == "sales":
self._reset_pending_order_states(user_id=user_id)
context["pending_switch"] = None
if had_flow:
return f"Fluxo atual de {self._domain_label(active_domain)} cancelado."
return "Nao havia fluxo em andamento para cancelar."
async def _continue_next_order_now(self, user_id: int | None) -> str:
context = self._get_user_context(user_id)
if not context:
return "Nao encontrei contexto ativo para continuar."
if context.get("pending_order_selection"):
return "Ainda preciso que voce escolha qual das duas acoes deseja iniciar primeiro."
pending_switch = context.get("pending_switch")
if isinstance(pending_switch, dict):
queued_message = str(pending_switch.get("queued_message") or "").strip()
if queued_message:
target_domain = str(pending_switch.get("target_domain") or "general")
memory_seed = dict(pending_switch.get("memory_seed") or {})
self._apply_domain_switch(user_id=user_id, target_domain=target_domain)
refreshed = self._get_user_context(user_id)
if refreshed is not None:
refreshed["generic_memory"] = memory_seed
transition = self._build_next_order_transition(target_domain)
next_response = await self.handle_message(queued_message, user_id=user_id)
return f"{transition}\n{next_response}"
next_order = self._pop_next_order(user_id=user_id)
if not next_order:
return "Nao ha pedidos pendentes na fila para continuar."
target_domain = str(next_order.get("domain") or "general")
memory_seed = dict(next_order.get("memory_seed") or self._new_tab_memory(user_id=user_id))
self._apply_domain_switch(user_id=user_id, target_domain=target_domain)
refreshed = self._get_user_context(user_id)
if refreshed is not None:
refreshed["generic_memory"] = memory_seed
transition = self._build_next_order_transition(target_domain)
next_response = await self.handle_message(str(next_order.get("message") or ""), user_id=user_id)
return f"{transition}\n{next_response}"
async def _tool_limpar_contexto_conversa(
self,
motivo: str | None = None,
user_id: int | None = None,
) -> dict:
self._clear_user_conversation_state(user_id=user_id)
message = "Contexto da conversa limpo. Podemos recomecar do zero."
if motivo:
message = f"{message}\nMotivo registrado: {motivo.strip()}"
return {"message": message}
async def _tool_descartar_pedidos_pendentes(
self,
motivo: str | None = None,
user_id: int | None = None,
) -> dict:
dropped = self._clear_pending_order_navigation(user_id=user_id)
if dropped <= 0:
message = "Nao havia pedidos pendentes na fila para descartar."
elif dropped == 1:
message = "Descartei 1 pedido pendente da fila."
else:
message = f"Descartei {dropped} pedidos pendentes da fila."
if motivo:
message = f"{message}\nMotivo registrado: {motivo.strip()}"
return {"message": message}
async def _tool_cancelar_fluxo_atual(
self,
motivo: str | None = None,
user_id: int | None = None,
) -> dict:
message = self._cancel_active_flow(user_id=user_id)
if motivo:
message = f"{message}\nMotivo registrado: {motivo.strip()}"
return {"message": message}
async def _tool_continuar_proximo_pedido(self, user_id: int | None = None) -> str:
return await self._continue_next_order_now(user_id=user_id)
# Nessa função é onde eu configuro a memória volátil do sistema
def _upsert_user_context(self, user_id: int | None) -> None:
self.state.upsert_user_context(user_id=user_id, ttl_minutes=USER_CONTEXT_TTL_MINUTES)
@ -597,18 +810,77 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin):
return False
return None
def _normalize_datetime_connector(self, text: str) -> str:
compact = " ".join(str(text or "").strip().split())
lowered = compact.lower()
marker = " as "
if marker in lowered:
index = lowered.index(marker)
return f"{compact[:index]} {compact[index + len(marker):]}".strip()
return compact
def _try_parse_iso_datetime(self, text: str) -> datetime | None:
candidate = str(text or "").strip()
if not candidate:
return None
try:
return datetime.fromisoformat(candidate.replace("Z", "+00:00"))
except ValueError:
return None
def _try_parse_datetime_with_formats(self, text: str, formats: tuple[str, ...]) -> datetime | None:
candidate = str(text or "").strip()
if not candidate:
return None
for fmt in formats:
try:
return datetime.strptime(candidate, fmt)
except ValueError:
continue
return None
def _try_parse_review_absolute_datetime(self, text: str) -> datetime | None:
normalized = self._normalize_datetime_connector(text)
parsed = self._try_parse_iso_datetime(normalized)
if parsed is not None:
return parsed
day_first_formats = (
"%d/%m/%Y %H:%M",
"%d/%m/%Y %H:%M:%S",
"%d-%m-%Y %H:%M",
"%d-%m-%Y %H:%M:%S",
)
year_first_formats = (
"%Y/%m/%d %H:%M",
"%Y/%m/%d %H:%M:%S",
"%Y-%m-%d %H:%M",
"%Y-%m-%d %H:%M:%S",
)
return self._try_parse_datetime_with_formats(normalized, day_first_formats + year_first_formats)
def _extract_hhmm_from_text(self, text: str) -> str | None:
cleaned = self._normalize_datetime_connector(text)
for token in cleaned.split():
normalized_token = self._strip_token_edges(token)
parts = normalized_token.split(":")
if len(parts) not in {2, 3}:
continue
if not all(part.isdigit() for part in parts):
continue
hour = int(parts[0])
minute = int(parts[1])
if 0 <= hour <= 23 and 0 <= minute <= 59:
return f"{hour:02d}:{minute:02d}"
return None
def _normalize_review_datetime_text(self, value) -> str | None:
text = str(value or "").strip()
if not text:
return None
# Mantem formatos absolutos que o handler ja sabe interpretar.
absolute_patterns = (
r"^\d{1,2}[/-]\d{1,2}[/-]\d{4}\s+(?:as\s+)?\d{1,2}:\d{2}(?::\d{2})?$",
r"^\d{4}[/-]\d{1,2}[/-]\d{1,2}\s+(?:as\s+)?\d{1,2}:\d{2}(?::\d{2})?$",
r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}(?::\d{2})?(?:Z|[+-]\d{2}:\d{2})?$",
)
if any(re.match(pattern, text, flags=re.IGNORECASE) for pattern in absolute_patterns):
absolute_dt = self._try_parse_review_absolute_datetime(text)
if absolute_dt is not None:
return text
normalized = self._normalize_text(text)
@ -620,12 +892,13 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin):
if day_offset is None:
return text
time_match = re.search(r"\b([01]?\d|2[0-3]):([0-5]\d)\b", normalized)
if not time_match:
time_text = self._extract_hhmm_from_text(normalized)
if not time_text:
return text
hour = int(time_match.group(1))
minute = int(time_match.group(2))
hour_text, minute_text = time_text.split(":")
hour = int(hour_text)
minute = int(minute_text)
target_date = datetime.now() + timedelta(days=day_offset)
return f"{target_date.strftime('%d/%m/%Y')} {hour:02d}:{minute:02d}"
@ -673,11 +946,63 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin):
extracted["revisao_previa_concessionaria"] = reviewed
return extracted
def _extract_review_protocol_from_text(self, text: str) -> str | None:
match = re.search(r"\bREV-[A-Z0-9\-]+\b", str(text or "").upper())
if not match:
def _tokenize_text(self, text: str) -> list[str]:
return [token for token in str(text or "").split() if token]
def _clean_protocol_token(self, token: str) -> str:
cleaned = str(token or "").strip().upper()
edge_chars = "[](){}<>,.;:!?\"'`"
while cleaned and cleaned[0] in edge_chars:
cleaned = cleaned[1:]
while cleaned and cleaned[-1] in edge_chars:
cleaned = cleaned[:-1]
return cleaned
def _strip_token_edges(self, token: str) -> str:
cleaned = str(token or "").strip()
edge_chars = "[](){}<>,.;:!?\"'`"
while cleaned and cleaned[0] in edge_chars:
cleaned = cleaned[1:]
while cleaned and cleaned[-1] in edge_chars:
cleaned = cleaned[:-1]
return cleaned
def _is_valid_protocol_suffix(self, value: str) -> bool:
if not value or len(value) < 4:
return False
return all(char.isalnum() for char in value)
def _normalize_review_protocol(self, value: str) -> str | None:
candidate = self._clean_protocol_token(value)
if not candidate.startswith("REV-"):
return None
return match.group(0)
parts = candidate.split("-")
if len(parts) != 3:
return None
prefix, date_part, suffix_part = parts
if prefix != "REV":
return None
if len(date_part) != 8 or not date_part.isdigit():
return None
try:
datetime.strptime(date_part, "%Y%m%d")
except ValueError:
return None
if not self._is_valid_protocol_suffix(suffix_part):
return None
return f"{prefix}-{date_part}-{suffix_part}"
def _extract_review_protocol_from_text(self, text: str) -> str | None:
for token in self._tokenize_text(text):
normalized = self._normalize_review_protocol(token)
if normalized:
return normalized
normalized_full = self._normalize_review_protocol(str(text or ""))
if normalized_full:
return normalized_full
return None
def _normalize_review_management_fields(self, data) -> dict:
if not isinstance(data, dict):
@ -762,6 +1087,20 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin):
payload["placa"] = plate
def _queue_order(self, user_id: int | None, domain: str, order_message: str) -> None:
self._queue_order_with_memory_seed(
user_id=user_id,
domain=domain,
order_message=order_message,
memory_seed=self._new_tab_memory(user_id=user_id),
)
def _queue_order_with_memory_seed(
self,
user_id: int | None,
domain: str,
order_message: str,
memory_seed: dict | None = None,
) -> None:
context = self._get_user_context(user_id)
if not context:
return
@ -772,7 +1111,7 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin):
{
"domain": domain,
"message": (order_message or "").strip(),
"memory_seed": self._new_tab_memory(user_id=user_id),
"memory_seed": dict(memory_seed or self._new_tab_memory(user_id=user_id)),
"created_at": datetime.utcnow().isoformat(),
}
)
@ -814,6 +1153,14 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin):
if not extracted_orders:
extracted_orders = [{"domain": "general", "message": (message or "").strip()}]
if (
len(extracted_orders) == 2
and all(order["domain"] != "general" for order in extracted_orders)
and not self._has_open_flow(user_id=user_id, domain=active_domain)
):
self._store_pending_order_selection(user_id=user_id, orders=extracted_orders)
return message, None, self._render_order_selection_prompt(extracted_orders)
if len(extracted_orders) <= 1:
inferred = extracted_orders[0]["domain"]
if (
@ -868,6 +1215,212 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin):
lines.append(response)
return "\n".join(lines)
def _store_pending_order_selection(self, user_id: int | None, orders: list[dict]) -> None:
context = self._get_user_context(user_id)
if not context:
return
context["pending_order_selection"] = {
"orders": [
{
"domain": order["domain"],
"message": order["message"],
"memory_seed": self._new_tab_memory(user_id=user_id),
}
for order in orders[:2]
],
"expires_at": datetime.utcnow() + timedelta(minutes=PENDING_ORDER_SELECTION_TTL_MINUTES),
}
def _render_order_selection_prompt(self, orders: list[dict]) -> str:
if len(orders) < 2:
return "Qual das acoes voce quer iniciar primeiro?"
first_label = self._describe_order_selection_option(orders[0])
second_label = self._describe_order_selection_option(orders[1])
return (
"Identifiquei duas acoes na sua mensagem:\n"
f"1. {first_label}\n"
f"2. {second_label}\n"
"Qual delas voce quer iniciar primeiro? Se for indiferente, eu escolho."
)
def _describe_order_selection_option(self, order: dict) -> str:
domain = str(order.get("domain") or "general")
message = str(order.get("message") or "").strip()
domain_prefix = {
"review": "Revisao",
"sales": "Venda",
"general": "Atendimento",
}.get(domain, "Atendimento")
return f"{domain_prefix}: {message}"
def _contains_any_term(self, text: str, terms: set[str]) -> bool:
return any(term in text for term in terms)
def _strip_choice_message(self, text: str) -> str:
cleaned = (text or "").strip()
trailing_chars = ".!?,;:"
while cleaned and cleaned[-1] in trailing_chars:
cleaned = cleaned[:-1].rstrip()
return cleaned
def _remove_order_selection_reset_prefix(self, message: str) -> str:
raw = (message or "").strip()
normalized = self._normalize_text(raw)
prefixes = (
"esqueca tudo agora",
"esqueca tudo",
"esquece tudo agora",
"esquece tudo",
)
for prefix in prefixes:
if normalized.startswith(prefix):
return raw[len(prefix):].lstrip(" ,.:;-")
return raw
def _is_order_selection_reset_message(self, message: str) -> bool:
normalized = self._normalize_text(message).strip()
reset_terms = {
"esqueca tudo",
"esqueca tudo agora",
"esquece tudo",
"esquece tudo agora",
"ignora isso",
"ignore isso",
"deixa isso",
"deixa pra la",
"deixa para la",
"novo assunto",
"muda de assunto",
"vamos comecar de novo",
"comecar de novo",
"reiniciar",
"resetar conversa",
}
return self._contains_any_term(normalized, reset_terms)
def _looks_like_fresh_operational_request(self, message: str) -> bool:
normalized = self._normalize_text(message).strip()
if len(normalized) < 15:
return False
operational_terms = {
"agendar",
"revisao",
"cancelar",
"pedido",
"comprar",
"compra",
"carro",
"veiculo",
"remarcar",
"tambem",
}
return self._contains_any_term(normalized, operational_terms)
def _detect_selected_order_index(self, message: str, orders: list[dict]) -> tuple[int | None, bool]:
normalized = self._strip_choice_message(self._normalize_text(message))
indifferent_tokens = {
"tanto faz",
"indiferente",
"qualquer um",
"qualquer uma",
"voce escolhe",
"pode escolher",
"fica a seu criterio",
}
if normalized in indifferent_tokens:
return 0, True
first_tokens = {"1", "primeiro", "primeira", "opcao 1", "acao 1", "pedido 1"}
second_tokens = {"2", "segundo", "segunda", "opcao 2", "acao 2", "pedido 2"}
if normalized in first_tokens:
return 0, False
if normalized in second_tokens:
return 1, False
review_keywords = {
"revisao",
"agendamento",
"agendar",
"remarcar",
"pos venda",
}
sales_keywords = {
"venda",
"compra",
"comprar",
"pedido",
"cancelamento",
"cancelar",
"carro",
"veiculo",
}
review_matches = [index for index, order in enumerate(orders) if order.get("domain") == "review"]
sales_matches = [index for index, order in enumerate(orders) if order.get("domain") == "sales"]
has_review_signal = self._contains_any_term(normalized, review_keywords)
has_sales_signal = self._contains_any_term(normalized, sales_keywords)
if len(review_matches) == 1 and has_review_signal and not has_sales_signal:
return review_matches[0], False
if len(sales_matches) == 1 and has_sales_signal and not has_review_signal:
return sales_matches[0], False
return None, False
async def _try_resolve_pending_order_selection(
self,
message: str,
user_id: int | None,
) -> str | None:
context = self._get_user_context(user_id)
if not context:
return None
pending = context.get("pending_order_selection")
if not isinstance(pending, dict):
return None
if pending.get("expires_at") and pending["expires_at"] < datetime.utcnow():
context["pending_order_selection"] = None
return None
orders = pending.get("orders") or []
if len(orders) < 2:
context["pending_order_selection"] = None
return None
if self._is_order_selection_reset_message(message):
self._clear_user_conversation_state(user_id=user_id)
cleaned_message = self._remove_order_selection_reset_prefix(message)
if not cleaned_message:
return "Tudo bem. Limpei o contexto atual. Pode me dizer o que voce quer fazer agora?"
return await self.handle_message(cleaned_message, user_id=user_id)
selected_index, auto_selected = self._detect_selected_order_index(message=message, orders=orders)
if selected_index is None:
if self._looks_like_fresh_operational_request(message):
context["pending_order_selection"] = None
return None
return self._render_order_selection_prompt(orders)
selected_order = orders[selected_index]
remaining_order = orders[1 - selected_index]
context["pending_order_selection"] = None
self._queue_order_with_memory_seed(
user_id=user_id,
domain=remaining_order["domain"],
order_message=remaining_order["message"],
memory_seed=remaining_order.get("memory_seed"),
)
intro = (
f"Vou escolher e comecar por: {self._describe_order_selection_option(selected_order)}"
if auto_selected
else f"Perfeito. Vou comecar por: {self._describe_order_selection_option(selected_order)}"
)
next_response = await self.handle_message(str(selected_order.get("message") or ""), user_id=user_id)
return f"{intro}\n{next_response}"
def _render_queue_notice(self, queued_count: int) -> str | None:
if queued_count <= 0:
return None
@ -1036,6 +1589,7 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin):
self._reset_pending_order_states(user_id=user_id)
context["active_domain"] = target_domain
context["generic_memory"] = self._new_tab_memory(user_id=user_id)
context["pending_order_selection"] = None
context["pending_switch"] = None
def _handle_context_switch(
@ -1060,6 +1614,10 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin):
context["pending_switch"] = None
return "Perfeito, vamos continuar no fluxo atual."
pending_order_selection = context.get("pending_order_selection")
if pending_order_selection and pending_order_selection.get("expires_at") < datetime.utcnow():
context["pending_order_selection"] = None
current_domain = context.get("active_domain", "general")
target_domain = target_domain_hint
if target_domain == "general" or target_domain == current_domain:
@ -1142,10 +1700,7 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin):
)
def _extract_time_only(self, text: str) -> str | None:
match = re.search(r"\b([01]?\d|2[0-3]):([0-5]\d)\b", text or "")
if not match:
return None
return f"{int(match.group(1)):02d}:{match.group(2)}"
return self._extract_hhmm_from_text(text)
def _merge_date_with_time(self, base_iso: str, new_time_hhmm: str) -> str | None:
try:
@ -1168,11 +1723,10 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin):
) -> None:
if tool_name != "agendar_revisao" or user_id is None or exc.status_code != 409:
return
detail = exc.detail if isinstance(exc.detail, str) else ""
match = re.search(r"ISO:\s*([^)]+)\)", detail)
if not match:
detail = exc.detail if isinstance(exc.detail, dict) else {}
suggested_iso = str(detail.get("suggested_iso") or "").strip()
if not suggested_iso:
return
suggested_iso = match.group(1).strip()
payload = dict(arguments or {})
if not payload.get("placa"):
return
@ -1280,6 +1834,10 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin):
detail = exc.detail
if isinstance(detail, str):
return detail
if isinstance(detail, dict):
message = str(detail.get("message") or "").strip()
if message:
return message
return "Nao foi possivel concluir a operacao solicitada."
def _fallback_format_tool_result(self, tool_name: str, tool_result) -> str:

@ -15,6 +15,11 @@ def build_router_prompt(
"Voce e um assistente de concessionaria. "
"Sempre que a solicitacao depender de dados operacionais (estoque, validacao de cliente, "
"avaliacao de troca, agendamento de revisao, realizacao ou cancelamento de pedido), use a tool correta. "
"Se o usuario pedir para recomecar, esquecer contexto, cancelar fluxo atual, descartar fila pendente "
"ou continuar o proximo pedido, use a tool de orquestracao apropriada. "
"Mensagens de controle da conversa tem prioridade sobre qualquer fluxo em aberto. "
"Se houver um rascunho ativo e o usuario mandar algo como esquecer tudo, cancelar fluxo, descartar pendencias "
"ou continuar, trate isso como comando global e chame a tool correspondente. "
"Se faltar parametro obrigatorio para a tool, responda em texto pedindo apenas o que falta.\n\n"
f"{user_context}"
f"{conversation_context}\n"
@ -30,6 +35,8 @@ def build_force_tool_prompt(
user_context = _build_user_context_line(user_id)
return (
"Reavalie a mensagem e priorize chamar tool se houver intencao operacional. "
"Considere tambem tools de orquestracao para limpar contexto, cancelar fluxo, descartar fila ou continuar o proximo pedido. "
"Mesmo com fluxo incremental ativo, se a mensagem for de controle global da conversa, a tool de orquestracao deve vencer o rascunho atual. "
"Use texto apenas quando faltar dado obrigatorio.\n\n"
f"{user_context}"
f"{conversation_context}\n"

@ -134,4 +134,17 @@ def fallback_format_tool_result(tool_name: str, tool_result: Any) -> str:
f"Limite: {limite}"
)
if tool_name in {
"limpar_contexto_conversa",
"continuar_proximo_pedido",
"descartar_pedidos_pendentes",
"cancelar_fluxo_atual",
}:
if isinstance(tool_result, str):
return tool_result
if isinstance(tool_result, dict):
message = str(tool_result.get("message") or "").strip()
if message:
return message
return "Operacao concluida com sucesso."

@ -8,7 +8,8 @@ from fastapi import HTTPException
from sqlalchemy import func
from app.db.mock_database import SessionMockLocal
from app.db.mock_models import Customer, Order, ReviewSchedule, Vehicle
from app.db.mock_models import Customer, Order, ReviewSchedule, User, Vehicle
from app.services.user.mock_customer_service import hydrate_mock_customer_from_cpf
# Nesse arquivo eu faço a normalização dos dados para persisti-los no DB
@ -269,6 +270,31 @@ def _find_next_available_review_slot(
return None
def _build_review_conflict_detail(
requested_dt: datetime,
suggested_dt: Optional[datetime] = None,
) -> Dict[str, Any]:
if suggested_dt is not None:
return {
"code": "review_schedule_conflict",
"message": (
f"O horario {_format_datetime_pt_br(requested_dt)} ja esta ocupado. "
f"Posso agendar em {_format_datetime_pt_br(suggested_dt)}."
),
"requested_iso": requested_dt.isoformat(),
"suggested_iso": suggested_dt.isoformat(),
}
return {
"code": "review_schedule_conflict",
"message": (
f"O horario {_format_datetime_pt_br(requested_dt)} ja esta ocupado e nao encontrei "
"disponibilidade nas proximas 8 horas."
),
"requested_iso": requested_dt.isoformat(),
"suggested_iso": None,
}
async def agendar_revisao(
placa: str,
data_hora: str,
@ -329,18 +355,14 @@ async def agendar_revisao(
if proximo_horario:
raise HTTPException(
status_code=409,
detail=(
f"O horario {_format_datetime_pt_br(dt)} ja esta ocupado. "
f"Posso agendar em {_format_datetime_pt_br(proximo_horario)} "
f"(ISO: {proximo_horario.isoformat()})."
detail=_build_review_conflict_detail(
requested_dt=dt,
suggested_dt=proximo_horario,
),
)
raise HTTPException(
status_code=409,
detail=(
f"O horario {_format_datetime_pt_br(dt)} ja esta ocupado e nao encontrei "
"disponibilidade nas proximas 8 horas."
),
detail=_build_review_conflict_detail(requested_dt=dt),
)
existente = db.query(ReviewSchedule).filter(ReviewSchedule.protocolo == protocolo).first()
@ -526,18 +548,14 @@ async def editar_data_revisao(
if proximo_horario:
raise HTTPException(
status_code=409,
detail=(
f"O horario {_format_datetime_pt_br(nova_data)} ja esta ocupado. "
f"Sugestao: {_format_datetime_pt_br(proximo_horario)} "
f"(ISO: {proximo_horario.isoformat()})."
detail=_build_review_conflict_detail(
requested_dt=nova_data,
suggested_dt=proximo_horario,
),
)
raise HTTPException(
status_code=409,
detail=(
f"O horario {_format_datetime_pt_br(nova_data)} ja esta ocupado e nao encontrei "
"disponibilidade nas proximas 8 horas."
),
detail=_build_review_conflict_detail(requested_dt=nova_data),
)
agendamento.data_hora = nova_data
@ -612,6 +630,7 @@ async def cancelar_pedido(numero_pedido: str, motivo: str, user_id: Optional[int
async def realizar_pedido(cpf: str, valor_veiculo: float, user_id: Optional[int] = None) -> Dict[str, Any]:
"""Cria um novo pedido de compra quando o cliente estiver aprovado para o valor informado."""
cpf_norm = normalize_cpf(cpf)
await hydrate_mock_customer_from_cpf(cpf=cpf_norm, user_id=user_id)
avaliacao = await validar_cliente_venda(cpf=cpf_norm, valor_veiculo=valor_veiculo)
if not avaliacao.get("aprovado"):
raise HTTPException(
@ -625,6 +644,11 @@ async def realizar_pedido(cpf: str, valor_veiculo: float, user_id: Optional[int]
numero_pedido = f"PED-{datetime.utcnow().strftime('%Y%m%d%H%M%S')}-{uuid4().hex[:6].upper()}"
db = SessionMockLocal()
try:
if user_id is not None:
user = db.query(User).filter(User.id == user_id).first()
if user and user.cpf != cpf_norm:
user.cpf = cpf_norm
pedido = Order(
numero_pedido=numero_pedido,
user_id=user_id,

@ -33,13 +33,16 @@ HANDLERS: Dict[str, Callable] = {
class ToolRegistry:
def __init__(self, db: Session):
def __init__(self, db: Session, extra_handlers: Dict[str, Callable] | None = None):
"""Carrega tools do banco e registra apenas as que possuem handler conhecido."""
self._tools = []
available_handlers = dict(HANDLERS)
if extra_handlers:
available_handlers.update(extra_handlers)
repo = ToolRepository(db)
db_tools = repo.get_all()
for db_tool in db_tools:
handler = HANDLERS.get(db_tool.name)
handler = available_handlers.get(db_tool.name)
if not handler:
continue
self.register_tool(

@ -0,0 +1,88 @@
import hashlib
from app.db.mock_database import SessionMockLocal
from app.db.mock_models import Customer, User
MOCK_CUSTOMER_NAMES = [
"Ana Souza",
"Bruno Lima",
"Carla Mendes",
"Diego Santos",
"Eduarda Alves",
"Felipe Rocha",
"Gabriela Costa",
"Henrique Martins",
"Isabela Ferreira",
"Joao Ribeiro",
]
def _normalize_cpf(value: str) -> str:
return "".join(char for char in str(value or "") if char.isdigit())
def _stable_int(seed_text: str) -> int:
digest = hashlib.sha256(seed_text.encode("utf-8")).hexdigest()
return int(digest[:16], 16)
def _build_mock_customer_profile(cpf: str) -> dict:
entropy = _stable_int(cpf)
name = MOCK_CUSTOMER_NAMES[entropy % len(MOCK_CUSTOMER_NAMES)]
return {
"cpf": cpf,
"nome": f"{name} {_stable_int(cpf + '-suffix') % 900 + 100}",
"score": int(350 + (entropy % 500)),
"limite_credito": float(35_000 + (entropy % 140_000)),
"possui_restricao": (entropy % 9 == 0),
}
async def hydrate_mock_customer_from_cpf(
cpf: str,
user_id: int | None = None,
) -> dict:
"""
Simula uma consulta externa por CPF e garante dados ficticios no banco mock.
Esta funcao existe apenas para a fase local/mock. Quando a API real entrar,
ela deve ser substituida/removida.
"""
cpf_norm = _normalize_cpf(cpf)
if len(cpf_norm) != 11:
raise ValueError("CPF invalido para hidratacao mock.")
db = SessionMockLocal()
try:
customer = db.query(Customer).filter(Customer.cpf == cpf_norm).first()
created_customer = False
if not customer:
customer = Customer(**_build_mock_customer_profile(cpf_norm))
db.add(customer)
created_customer = True
linked_user = False
user = None
if user_id is not None:
user = db.query(User).filter(User.id == user_id).first()
if user and user.cpf != cpf_norm:
user.cpf = cpf_norm
linked_user = True
db.commit()
db.refresh(customer)
if user is not None:
db.refresh(user)
return {
"cpf": customer.cpf,
"nome": customer.nome,
"score": int(customer.score),
"limite_credito": float(customer.limite_credito),
"possui_restricao": bool(customer.possui_restricao),
"customer_created": created_customer,
"user_linked": linked_user,
}
finally:
db.close()
Loading…
Cancel
Save