🧩 refactor(orchestration): decompor orquestrador e mixins de fluxo

Extrai a gestao de contexto, memoria compartilhada, efeitos colaterais de tools e sugestoes de estoque do OrquestradorService para um OrchestratorContextManager dedicado, mantendo o servico principal focado na coordenacao do turno e preservando os contratos internos ja usados pela aplicacao.

Separa prompts, trace, normalizacao de invocacao, merge de drafts, execucao de tools e renderizacao com fallback em um OrchestratorExecutionManager, enquanto os mixins de venda, revisao e locacao passam a delegar buckets, snapshots e selecoes para helpers especificos apoiados por uma base FlowStateSupport reutilizavel.

Mantem a logica e as regras de negocio inalteradas, endurece a compatibilidade com cenarios de teste que instanciam o servico parcialmente e valida a refatoracao com a suite completa verde em 218 testes.
main
parent 6f69094412
commit 765108c904

@ -0,0 +1,131 @@
from __future__ import annotations
from app.core.time_utils import utc_now
class FlowStateSupport:
"""Utilitarios compartilhados para buckets e snapshots de fluxo."""
def __init__(self, service) -> None:
self.service = service
def get_state_repository(self):
return getattr(self.service, "state", None)
def get_state_entry(self, bucket: str, user_id: int | None, *, expire: bool = False):
state = self.get_state_repository()
if state is None or not hasattr(state, "get_entry"):
return None
return state.get_entry(bucket, user_id, expire=expire)
def set_state_entry(self, bucket: str, user_id: int | None, value) -> None:
state = self.get_state_repository()
if state is None or not hasattr(state, "set_entry"):
return
state.set_entry(bucket, user_id, value)
def pop_state_entry(self, bucket: str, user_id: int | None):
state = self.get_state_repository()
if state is None or not hasattr(state, "pop_entry"):
return None
return state.pop_entry(bucket, user_id)
def get_flow_snapshot(self, user_id: int | None, snapshot_key: str) -> dict | None:
if user_id is None or not hasattr(self.service, "_get_user_context"):
return None
context = self.service._get_user_context(user_id)
if not isinstance(context, dict):
return None
snapshots = context.get("flow_snapshots")
if not isinstance(snapshots, dict):
return None
snapshot = snapshots.get(snapshot_key)
return dict(snapshot) if isinstance(snapshot, dict) else None
def set_flow_snapshot(
self,
user_id: int | None,
snapshot_key: str,
value: dict | None,
*,
active_task: str | None = None,
) -> None:
if user_id is None or not hasattr(self.service, "_get_user_context") or not hasattr(self.service, "_save_user_context"):
return
context = self.service._get_user_context(user_id)
if not isinstance(context, dict):
return
snapshots = context.get("flow_snapshots")
if not isinstance(snapshots, dict):
snapshots = {}
context["flow_snapshots"] = snapshots
if isinstance(value, dict):
snapshots[snapshot_key] = value
if active_task:
context["active_task"] = active_task
collected_slots = context.get("collected_slots")
if not isinstance(collected_slots, dict):
collected_slots = {}
context["collected_slots"] = collected_slots
payload = value.get("payload")
if isinstance(payload, dict):
collected_slots[active_task] = dict(payload)
else:
snapshots.pop(snapshot_key, None)
if active_task and context.get("active_task") == active_task:
context["active_task"] = None
collected_slots = context.get("collected_slots")
if isinstance(collected_slots, dict) and active_task:
collected_slots.pop(active_task, None)
self.service._save_user_context(user_id=user_id, context=context)
def get_flow_entry(self, bucket: str, user_id: int | None, snapshot_key: str) -> dict | None:
entry = self.get_state_entry(bucket, user_id, expire=True)
if entry:
return entry
snapshot = self.get_flow_snapshot(user_id=user_id, snapshot_key=snapshot_key)
if not snapshot:
return None
if snapshot.get("expires_at") and snapshot["expires_at"] < utc_now():
self.set_flow_snapshot(user_id=user_id, snapshot_key=snapshot_key, value=None)
return None
self.set_state_entry(bucket, user_id, snapshot)
return snapshot
def set_flow_entry(
self,
bucket: str,
user_id: int | None,
snapshot_key: str,
value: dict,
*,
active_task: str | None = None,
) -> None:
self.set_state_entry(bucket, user_id, value)
self.set_flow_snapshot(
user_id=user_id,
snapshot_key=snapshot_key,
value=value,
active_task=active_task,
)
def pop_flow_entry(
self,
bucket: str,
user_id: int | None,
snapshot_key: str,
*,
active_task: str | None = None,
) -> dict | None:
entry = self.pop_state_entry(bucket, user_id)
self.set_flow_snapshot(
user_id=user_id,
snapshot_key=snapshot_key,
value=None,
active_task=active_task,
)
return entry

@ -15,43 +15,28 @@ from app.services.orchestration.orchestrator_config import (
PENDING_ORDER_SELECTION_TTL_MINUTES,
)
from app.services.user.mock_customer_service import hydrate_mock_customer_from_cpf
from app.services.flows.order_flow_support import OrderFlowStateSupport
# Esse mixin cuida dos fluxos de venda:
# criacao de pedido, selecao de veiculo e cancelamento.
class OrderFlowMixin:
@property
def _order_flow_state_support(self) -> OrderFlowStateSupport:
support = getattr(self, "__order_flow_state_support", None)
if support is None:
support = OrderFlowStateSupport(self)
setattr(self, "__order_flow_state_support", support)
return support
def _sanitize_stock_results(self, stock_results: list[dict] | None) -> list[dict]:
sanitized: list[dict] = []
for item in stock_results or []:
if not isinstance(item, dict):
continue
try:
vehicle_id = int(item.get("id"))
preco = float(item.get("preco") or 0)
except (TypeError, ValueError):
continue
sanitized.append(
{
"id": vehicle_id,
"modelo": str(item.get("modelo") or "").strip(),
"categoria": str(item.get("categoria") or "").strip(),
"preco": preco,
"budget_relaxed": bool(item.get("budget_relaxed", False)),
}
)
return sanitized
return self._order_flow_state_support.sanitize_stock_results(stock_results)
def _get_order_flow_snapshot(self, user_id: int | None, snapshot_key: str) -> dict | None:
if user_id is None or not hasattr(self, "_get_user_context"):
return None
context = self._get_user_context(user_id)
if not isinstance(context, dict):
return None
snapshots = context.get("flow_snapshots")
if not isinstance(snapshots, dict):
return None
snapshot = snapshots.get(snapshot_key)
return dict(snapshot) if isinstance(snapshot, dict) else None
return self._order_flow_state_support.get_flow_snapshot(
user_id=user_id,
snapshot_key=snapshot_key,
)
def _set_order_flow_snapshot(
self,
@ -61,51 +46,19 @@ class OrderFlowMixin:
*,
active_task: str | None = None,
) -> None:
if user_id is None or not hasattr(self, "_get_user_context") or not hasattr(self, "_save_user_context"):
return
context = self._get_user_context(user_id)
if not isinstance(context, dict):
return
snapshots = context.get("flow_snapshots")
if not isinstance(snapshots, dict):
snapshots = {}
context["flow_snapshots"] = snapshots
if isinstance(value, dict):
snapshots[snapshot_key] = value
if active_task:
context["active_task"] = active_task
collected_slots = context.get("collected_slots")
if not isinstance(collected_slots, dict):
collected_slots = {}
context["collected_slots"] = collected_slots
payload = value.get("payload")
if isinstance(payload, dict):
collected_slots[active_task] = dict(payload)
else:
snapshots.pop(snapshot_key, None)
if active_task and context.get("active_task") == active_task:
context["active_task"] = None
collected_slots = context.get("collected_slots")
if isinstance(collected_slots, dict) and active_task:
collected_slots.pop(active_task, None)
self._save_user_context(user_id=user_id, context=context)
self._order_flow_state_support.set_flow_snapshot(
user_id=user_id,
snapshot_key=snapshot_key,
value=value,
active_task=active_task,
)
def _get_order_flow_entry(self, bucket: str, user_id: int | None, snapshot_key: str) -> dict | None:
entry = self.state.get_entry(bucket, user_id, expire=True)
if entry:
return entry
snapshot = self._get_order_flow_snapshot(user_id=user_id, snapshot_key=snapshot_key)
if not snapshot:
return None
if snapshot.get("expires_at") and snapshot["expires_at"] < utc_now():
self._set_order_flow_snapshot(user_id=user_id, snapshot_key=snapshot_key, value=None)
return None
self.state.set_entry(bucket, user_id, snapshot)
return snapshot
return self._order_flow_state_support.get_flow_entry(
bucket=bucket,
user_id=user_id,
snapshot_key=snapshot_key,
)
def _set_order_flow_entry(
self,
@ -116,8 +69,8 @@ class OrderFlowMixin:
*,
active_task: str | None = None,
) -> None:
self.state.set_entry(bucket, user_id, value)
self._set_order_flow_snapshot(
self._order_flow_state_support.set_flow_entry(
bucket=bucket,
user_id=user_id,
snapshot_key=snapshot_key,
value=value,
@ -132,14 +85,12 @@ class OrderFlowMixin:
*,
active_task: str | None = None,
) -> dict | None:
entry = self.state.pop_entry(bucket, user_id)
self._set_order_flow_snapshot(
return self._order_flow_state_support.pop_flow_entry(
bucket=bucket,
user_id=user_id,
snapshot_key=snapshot_key,
value=None,
active_task=active_task,
)
return entry
def _decision_intent(self, turn_decision: dict | None) -> str:
return str((turn_decision or {}).get("intent") or "").strip().lower()
@ -275,89 +226,40 @@ class OrderFlowMixin:
db.close()
def _get_last_stock_results(self, user_id: int | None) -> list[dict]:
pending_selection = self.state.get_entry("pending_stock_selections", user_id, expire=True)
if isinstance(pending_selection, dict):
payload = pending_selection.get("payload")
if isinstance(payload, list):
sanitized = self._sanitize_stock_results(payload)
if sanitized:
return sanitized
context = self._get_user_context(user_id)
if not context:
return []
stock_results = context.get("last_stock_results") or []
return self._sanitize_stock_results(stock_results if isinstance(stock_results, list) else [])
return self._order_flow_state_support.get_last_stock_results(user_id=user_id)
def _store_pending_stock_selection(self, user_id: int | None, stock_results: list[dict] | None) -> None:
if user_id is None:
return
sanitized = self._sanitize_stock_results(stock_results)
if not sanitized:
self.state.pop_entry("pending_stock_selections", user_id)
return
self.state.set_entry(
"pending_stock_selections",
user_id,
{
"payload": sanitized,
"expires_at": utc_now() + timedelta(minutes=PENDING_ORDER_SELECTION_TTL_MINUTES),
},
self._order_flow_state_support.store_pending_stock_selection(
user_id=user_id,
stock_results=stock_results,
)
def _get_selected_vehicle(self, user_id: int | None) -> dict | None:
context = self._get_user_context(user_id)
if not context:
return None
selected_vehicle = context.get("selected_vehicle")
return dict(selected_vehicle) if isinstance(selected_vehicle, dict) else None
return self._order_flow_state_support.get_selected_vehicle(user_id=user_id)
def _get_pending_single_vehicle_confirmation(self, user_id: int | None) -> dict | None:
context = self._get_user_context(user_id)
if not context:
return None
pending_vehicle = context.get("pending_single_vehicle_confirmation")
return dict(pending_vehicle) if isinstance(pending_vehicle, dict) else None
return self._order_flow_state_support.get_pending_single_vehicle_confirmation(user_id=user_id)
def _remember_stock_results(self, user_id: int | None, stock_results: list[dict] | None) -> None:
context = self._get_user_context(user_id)
if not context:
return
sanitized = self._sanitize_stock_results(stock_results)
context["last_stock_results"] = sanitized
self._store_pending_stock_selection(user_id=user_id, stock_results=sanitized)
if sanitized:
context["selected_vehicle"] = None
context["pending_single_vehicle_confirmation"] = None
self._save_user_context(user_id=user_id, context=context)
self._order_flow_state_support.remember_stock_results(
user_id=user_id,
stock_results=stock_results,
)
def _store_selected_vehicle(self, user_id: int | None, vehicle: dict | None) -> None:
if user_id is None:
return
context = self._get_user_context(user_id)
if not context:
return
context["selected_vehicle"] = dict(vehicle) if isinstance(vehicle, dict) else None
context["pending_single_vehicle_confirmation"] = None
self.state.pop_entry("pending_stock_selections", user_id)
self._save_user_context(user_id=user_id, context=context)
self._order_flow_state_support.store_selected_vehicle(
user_id=user_id,
vehicle=vehicle,
)
def _store_pending_single_vehicle_confirmation(self, user_id: int | None, vehicle: dict | None) -> None:
if user_id is None:
return
context = self._get_user_context(user_id)
if not context:
return
context["pending_single_vehicle_confirmation"] = dict(vehicle) if isinstance(vehicle, dict) else None
self._save_user_context(user_id=user_id, context=context)
self._order_flow_state_support.store_pending_single_vehicle_confirmation(
user_id=user_id,
vehicle=vehicle,
)
def _clear_pending_single_vehicle_confirmation(self, user_id: int | None) -> None:
if user_id is None:
return
context = self._get_user_context(user_id)
if not isinstance(context, dict):
return
context["pending_single_vehicle_confirmation"] = None
self._save_user_context(user_id=user_id, context=context)
self._order_flow_state_support.clear_pending_single_vehicle_confirmation(user_id=user_id)
def _vehicle_to_payload(self, vehicle: dict) -> dict:
return {
@ -1192,3 +1094,4 @@ class OrderFlowMixin:
return self._fallback_format_tool_result("cancelar_pedido", tool_result)

@ -0,0 +1,117 @@
from __future__ import annotations
from datetime import timedelta
from app.core.time_utils import utc_now
from app.services.flows.flow_state_support import FlowStateSupport
from app.services.orchestration.orchestrator_config import PENDING_ORDER_SELECTION_TTL_MINUTES
class OrderFlowStateSupport(FlowStateSupport):
"""Concentra estado, snapshots e selecoes do fluxo de vendas."""
def sanitize_stock_results(self, stock_results: list[dict] | None) -> list[dict]:
sanitized: list[dict] = []
for item in stock_results or []:
if not isinstance(item, dict):
continue
try:
vehicle_id = int(item.get("id"))
preco = float(item.get("preco") or 0)
except (TypeError, ValueError):
continue
sanitized.append(
{
"id": vehicle_id,
"modelo": str(item.get("modelo") or "").strip(),
"categoria": str(item.get("categoria") or "").strip(),
"preco": preco,
"budget_relaxed": bool(item.get("budget_relaxed", False)),
}
)
return sanitized
def get_last_stock_results(self, user_id: int | None) -> list[dict]:
pending_selection = self.get_state_entry("pending_stock_selections", user_id, expire=True)
if isinstance(pending_selection, dict):
payload = pending_selection.get("payload")
if isinstance(payload, list):
sanitized = self.sanitize_stock_results(payload)
if sanitized:
return sanitized
context = self.service._get_user_context(user_id)
if not context:
return []
stock_results = context.get("last_stock_results") or []
return self.sanitize_stock_results(stock_results if isinstance(stock_results, list) else [])
def store_pending_stock_selection(self, user_id: int | None, stock_results: list[dict] | None) -> None:
if user_id is None:
return
sanitized = self.sanitize_stock_results(stock_results)
if not sanitized:
self.pop_state_entry("pending_stock_selections", user_id)
return
self.set_state_entry(
"pending_stock_selections",
user_id,
{
"payload": sanitized,
"expires_at": utc_now() + timedelta(minutes=PENDING_ORDER_SELECTION_TTL_MINUTES),
},
)
def get_selected_vehicle(self, user_id: int | None) -> dict | None:
context = self.service._get_user_context(user_id)
if not context:
return None
selected_vehicle = context.get("selected_vehicle")
return dict(selected_vehicle) if isinstance(selected_vehicle, dict) else None
def get_pending_single_vehicle_confirmation(self, user_id: int | None) -> dict | None:
context = self.service._get_user_context(user_id)
if not context:
return None
pending_vehicle = context.get("pending_single_vehicle_confirmation")
return dict(pending_vehicle) if isinstance(pending_vehicle, dict) else None
def remember_stock_results(self, user_id: int | None, stock_results: list[dict] | None) -> None:
context = self.service._get_user_context(user_id)
if not context:
return
sanitized = self.sanitize_stock_results(stock_results)
context["last_stock_results"] = sanitized
self.store_pending_stock_selection(user_id=user_id, stock_results=sanitized)
if sanitized:
context["selected_vehicle"] = None
context["pending_single_vehicle_confirmation"] = None
self.service._save_user_context(user_id=user_id, context=context)
def store_selected_vehicle(self, user_id: int | None, vehicle: dict | None) -> None:
if user_id is None:
return
context = self.service._get_user_context(user_id)
if not context:
return
context["selected_vehicle"] = dict(vehicle) if isinstance(vehicle, dict) else None
context["pending_single_vehicle_confirmation"] = None
self.pop_state_entry("pending_stock_selections", user_id)
self.service._save_user_context(user_id=user_id, context=context)
def store_pending_single_vehicle_confirmation(self, user_id: int | None, vehicle: dict | None) -> None:
if user_id is None:
return
context = self.service._get_user_context(user_id)
if not context:
return
context["pending_single_vehicle_confirmation"] = dict(vehicle) if isinstance(vehicle, dict) else None
self.service._save_user_context(user_id=user_id, context=context)
def clear_pending_single_vehicle_confirmation(self, user_id: int | None) -> None:
if user_id is None:
return
context = self.service._get_user_context(user_id)
if not isinstance(context, dict):
return
context["pending_single_vehicle_confirmation"] = None
self.service._save_user_context(user_id=user_id, context=context)

@ -7,185 +7,78 @@ from app.core.time_utils import utc_now
from app.services.orchestration import technical_normalizer
from app.services.orchestration.orchestrator_config import (
PENDING_RENTAL_DRAFT_TTL_MINUTES,
PENDING_RENTAL_SELECTION_TTL_MINUTES,
RENTAL_REQUIRED_FIELDS,
)
from app.services.flows.rental_flow_support import RentalFlowStateSupport
class RentalFlowMixin:
@property
def _rental_flow_state_support(self) -> RentalFlowStateSupport:
support = getattr(self, "__rental_flow_state_support", None)
if support is None:
support = RentalFlowStateSupport(self)
setattr(self, "__rental_flow_state_support", support)
return support
# Sanitiza resultados da frota antes de guardar no contexto.
def _sanitize_rental_results(self, rental_results: list[dict] | None) -> list[dict]:
sanitized: list[dict] = []
for item in rental_results or []:
if not isinstance(item, dict):
continue
try:
rental_vehicle_id = int(item.get("id"))
valor_diaria = float(item.get("valor_diaria") or 0)
ano = int(item.get("ano")) if item.get("ano") is not None else None
except (TypeError, ValueError):
continue
placa = technical_normalizer.normalize_plate(item.get("placa"))
if not placa:
continue
sanitized.append(
{
"id": rental_vehicle_id,
"placa": placa,
"modelo": str(item.get("modelo") or "").strip(),
"categoria": str(item.get("categoria") or "").strip().lower(),
"ano": ano,
"valor_diaria": valor_diaria,
"status": str(item.get("status") or "").strip().lower() or "disponivel",
}
)
return sanitized
return self._rental_flow_state_support.sanitize_rental_results(rental_results)
# Marca locacao como dominio ativo na conversa do usuario.
def _mark_rental_flow_active(self, user_id: int | None, *, active_task: str | None = None) -> None:
if user_id is None:
return
context = self._get_user_context(user_id)
if not isinstance(context, dict):
return
context["active_domain"] = "rental"
if active_task is not None:
context["active_task"] = active_task
self._save_user_context(user_id=user_id, context=context)
self._rental_flow_state_support.mark_rental_flow_active(
user_id=user_id,
active_task=active_task,
)
# Recupera a ultima lista de veiculos disponiveis para locacao.
def _get_last_rental_results(self, user_id: int | None) -> list[dict]:
pending_selection = self.state.get_entry("pending_rental_selections", user_id, expire=True)
if isinstance(pending_selection, dict):
payload = pending_selection.get("payload")
if isinstance(payload, list):
sanitized = self._sanitize_rental_results(payload)
if sanitized:
return sanitized
context = self._get_user_context(user_id)
if not isinstance(context, dict):
return []
rental_results = context.get("last_rental_results") or []
return self._sanitize_rental_results(rental_results if isinstance(rental_results, list) else [])
return self._rental_flow_state_support.get_last_rental_results(user_id=user_id)
# Guarda a lista atual para permitir selecao do veiculo em mensagens seguintes.
def _store_pending_rental_selection(self, user_id: int | None, rental_results: list[dict] | None) -> None:
if user_id is None:
return
sanitized = self._sanitize_rental_results(rental_results)
if not sanitized:
self.state.pop_entry("pending_rental_selections", user_id)
return
self.state.set_entry(
"pending_rental_selections",
user_id,
{
"payload": sanitized,
"expires_at": utc_now() + timedelta(minutes=PENDING_RENTAL_SELECTION_TTL_MINUTES),
},
self._rental_flow_state_support.store_pending_rental_selection(
user_id=user_id,
rental_results=rental_results,
)
# Le o veiculo de locacao escolhido que ficou salvo no contexto.
def _get_selected_rental_vehicle(self, user_id: int | None) -> dict | None:
context = self._get_user_context(user_id)
if not isinstance(context, dict):
return None
selected_vehicle = context.get("selected_rental_vehicle")
return dict(selected_vehicle) if isinstance(selected_vehicle, dict) else None
return self._rental_flow_state_support.get_selected_rental_vehicle(user_id=user_id)
# Filtra o payload do contrato para manter so dados uteis no contexto.
def _sanitize_rental_contract_snapshot(self, payload) -> dict | None:
if not isinstance(payload, dict):
return None
contract_number = str(payload.get("contrato_numero") or "").strip().upper()
plate = technical_normalizer.normalize_plate(payload.get("placa"))
if not contract_number and not plate:
return None
snapshot: dict = {}
if contract_number:
snapshot["contrato_numero"] = contract_number
if plate:
snapshot["placa"] = plate
for field_name in (
"modelo_veiculo",
"categoria",
"status",
"status_veiculo",
"data_inicio",
"data_fim_prevista",
"data_devolucao",
):
value = str(payload.get(field_name) or "").strip()
if value:
snapshot[field_name] = value
for field_name in ("valor_diaria", "valor_previsto", "valor_final"):
number = technical_normalizer.normalize_positive_number(payload.get(field_name))
if number is not None:
snapshot[field_name] = float(number)
return snapshot
return self._rental_flow_state_support.sanitize_rental_contract_snapshot(payload)
# Recupera o ultimo contrato de locacao lembrado para o usuario.
def _get_last_rental_contract(self, user_id: int | None) -> dict | None:
context = self._get_user_context(user_id)
if not isinstance(context, dict):
return None
contract = context.get("last_rental_contract")
return dict(contract) if isinstance(contract, dict) else None
return self._rental_flow_state_support.get_last_rental_contract(user_id=user_id)
# Atualiza o ultimo contrato de locacao salvo no contexto.
def _store_last_rental_contract(self, user_id: int | None, payload) -> None:
if user_id is None:
return
context = self._get_user_context(user_id)
if not isinstance(context, dict):
return
sanitized = self._sanitize_rental_contract_snapshot(payload)
if sanitized is None:
context.pop("last_rental_contract", None)
else:
context["last_rental_contract"] = sanitized
self._save_user_context(user_id=user_id, context=context)
self._rental_flow_state_support.store_last_rental_contract(
user_id=user_id,
payload=payload,
)
# Persiste a ultima consulta de frota para reuso no fluxo incremental.
def _remember_rental_results(self, user_id: int | None, rental_results: list[dict] | None) -> None:
context = self._get_user_context(user_id)
if not isinstance(context, dict):
return
sanitized = self._sanitize_rental_results(rental_results)
context["last_rental_results"] = sanitized
self._store_pending_rental_selection(user_id=user_id, rental_results=sanitized)
if sanitized:
context["selected_rental_vehicle"] = None
context["active_domain"] = "rental"
self._save_user_context(user_id=user_id, context=context)
self._rental_flow_state_support.remember_rental_results(
user_id=user_id,
rental_results=rental_results,
)
# Salva o veiculo escolhido e encerra a etapa de selecao pendente.
def _store_selected_rental_vehicle(self, user_id: int | None, vehicle: dict | None) -> None:
if user_id is None:
return
context = self._get_user_context(user_id)
if not isinstance(context, dict):
return
context["selected_rental_vehicle"] = dict(vehicle) if isinstance(vehicle, dict) else None
context["active_domain"] = "rental"
self.state.pop_entry("pending_rental_selections", user_id)
self._save_user_context(user_id=user_id, context=context)
self._rental_flow_state_support.store_selected_rental_vehicle(
user_id=user_id,
vehicle=vehicle,
)
# Converte um veiculo selecionado no payload esperado pela abertura da locacao.
def _rental_vehicle_to_payload(self, vehicle: dict) -> dict:
return {
"rental_vehicle_id": int(vehicle["id"]),
"placa": str(vehicle["placa"]),
"modelo_veiculo": str(vehicle["modelo"]),
"categoria": str(vehicle.get("categoria") or ""),
"valor_diaria": round(float(vehicle.get("valor_diaria") or 0), 2),
}
return self._rental_flow_state_support.rental_vehicle_to_payload(vehicle)
# Extrai a categoria de locacao mencionada livremente pelo usuario.
def _extract_rental_category_from_text(self, text: str) -> str | None:
@ -720,3 +613,4 @@ class RentalFlowMixin:
self._store_last_rental_contract(user_id=user_id, payload=tool_result)
self._reset_pending_rental_states(user_id=user_id)
return self._fallback_format_tool_result("abrir_locacao_aluguel", tool_result)

@ -0,0 +1,174 @@
from __future__ import annotations
from datetime import timedelta
from app.core.time_utils import utc_now
from app.services.flows.flow_state_support import FlowStateSupport
from app.services.orchestration import technical_normalizer
from app.services.orchestration.orchestrator_config import PENDING_RENTAL_SELECTION_TTL_MINUTES
class RentalFlowStateSupport(FlowStateSupport):
"""Concentra estado e contexto incremental do fluxo de locacao."""
def sanitize_rental_results(self, rental_results: list[dict] | None) -> list[dict]:
sanitized: list[dict] = []
for item in rental_results or []:
if not isinstance(item, dict):
continue
try:
rental_vehicle_id = int(item.get("id"))
valor_diaria = float(item.get("valor_diaria") or 0)
ano = int(item.get("ano")) if item.get("ano") is not None else None
except (TypeError, ValueError):
continue
placa = technical_normalizer.normalize_plate(item.get("placa"))
if not placa:
continue
sanitized.append(
{
"id": rental_vehicle_id,
"placa": placa,
"modelo": str(item.get("modelo") or "").strip(),
"categoria": str(item.get("categoria") or "").strip().lower(),
"ano": ano,
"valor_diaria": valor_diaria,
"status": str(item.get("status") or "").strip().lower() or "disponivel",
}
)
return sanitized
def mark_rental_flow_active(self, user_id: int | None, *, active_task: str | None = None) -> None:
if user_id is None:
return
context = self.service._get_user_context(user_id)
if not isinstance(context, dict):
return
context["active_domain"] = "rental"
if active_task is not None:
context["active_task"] = active_task
self.service._save_user_context(user_id=user_id, context=context)
def get_last_rental_results(self, user_id: int | None) -> list[dict]:
pending_selection = self.get_state_entry("pending_rental_selections", user_id, expire=True)
if isinstance(pending_selection, dict):
payload = pending_selection.get("payload")
if isinstance(payload, list):
sanitized = self.sanitize_rental_results(payload)
if sanitized:
return sanitized
context = self.service._get_user_context(user_id)
if not isinstance(context, dict):
return []
rental_results = context.get("last_rental_results") or []
return self.sanitize_rental_results(rental_results if isinstance(rental_results, list) else [])
def store_pending_rental_selection(self, user_id: int | None, rental_results: list[dict] | None) -> None:
if user_id is None:
return
sanitized = self.sanitize_rental_results(rental_results)
if not sanitized:
self.pop_state_entry("pending_rental_selections", user_id)
return
self.set_state_entry(
"pending_rental_selections",
user_id,
{
"payload": sanitized,
"expires_at": utc_now() + timedelta(minutes=PENDING_RENTAL_SELECTION_TTL_MINUTES),
},
)
def get_selected_rental_vehicle(self, user_id: int | None) -> dict | None:
context = self.service._get_user_context(user_id)
if not isinstance(context, dict):
return None
selected_vehicle = context.get("selected_rental_vehicle")
return dict(selected_vehicle) if isinstance(selected_vehicle, dict) else None
def sanitize_rental_contract_snapshot(self, payload) -> dict | None:
if not isinstance(payload, dict):
return None
contract_number = str(payload.get("contrato_numero") or "").strip().upper()
plate = technical_normalizer.normalize_plate(payload.get("placa"))
if not contract_number and not plate:
return None
snapshot: dict = {}
if contract_number:
snapshot["contrato_numero"] = contract_number
if plate:
snapshot["placa"] = plate
for field_name in (
"modelo_veiculo",
"categoria",
"status",
"status_veiculo",
"data_inicio",
"data_fim_prevista",
"data_devolucao",
):
value = str(payload.get(field_name) or "").strip()
if value:
snapshot[field_name] = value
for field_name in ("valor_diaria", "valor_previsto", "valor_final"):
number = technical_normalizer.normalize_positive_number(payload.get(field_name))
if number is not None:
snapshot[field_name] = float(number)
return snapshot
def get_last_rental_contract(self, user_id: int | None) -> dict | None:
context = self.service._get_user_context(user_id)
if not isinstance(context, dict):
return None
contract = context.get("last_rental_contract")
return dict(contract) if isinstance(contract, dict) else None
def store_last_rental_contract(self, user_id: int | None, payload) -> None:
if user_id is None:
return
context = self.service._get_user_context(user_id)
if not isinstance(context, dict):
return
sanitized = self.sanitize_rental_contract_snapshot(payload)
if sanitized is None:
context.pop("last_rental_contract", None)
else:
context["last_rental_contract"] = sanitized
self.service._save_user_context(user_id=user_id, context=context)
def remember_rental_results(self, user_id: int | None, rental_results: list[dict] | None) -> None:
context = self.service._get_user_context(user_id)
if not isinstance(context, dict):
return
sanitized = self.sanitize_rental_results(rental_results)
context["last_rental_results"] = sanitized
self.store_pending_rental_selection(user_id=user_id, rental_results=sanitized)
if sanitized:
context["selected_rental_vehicle"] = None
context["active_domain"] = "rental"
self.service._save_user_context(user_id=user_id, context=context)
def store_selected_rental_vehicle(self, user_id: int | None, vehicle: dict | None) -> None:
if user_id is None:
return
context = self.service._get_user_context(user_id)
if not isinstance(context, dict):
return
context["selected_rental_vehicle"] = dict(vehicle) if isinstance(vehicle, dict) else None
context["active_domain"] = "rental"
self.pop_state_entry("pending_rental_selections", user_id)
self.service._save_user_context(user_id=user_id, context=context)
def rental_vehicle_to_payload(self, vehicle: dict) -> dict:
return {
"rental_vehicle_id": int(vehicle["id"]),
"placa": str(vehicle["placa"]),
"modelo_veiculo": str(vehicle["modelo"]),
"categoria": str(vehicle.get("categoria") or ""),
"valor_diaria": round(float(vehicle.get("valor_diaria") or 0), 2),
}

@ -5,31 +5,30 @@ from app.core.time_utils import utc_now
from fastapi import HTTPException
from app.services.orchestration.orchestrator_config import (
LAST_REVIEW_PACKAGE_TTL_MINUTES,
PENDING_REVIEW_DRAFT_TTL_MINUTES,
REVIEW_REQUIRED_FIELDS,
)
from app.services.flows.review_flow_support import ReviewFlowStateSupport
# Esse mixin concentra os fluxos incrementais de revisao e pos-venda.
class ReviewFlowMixin:
@property
def _review_flow_state_support(self) -> ReviewFlowStateSupport:
support = getattr(self, "__review_flow_state_support", None)
if support is None:
support = ReviewFlowStateSupport(self)
setattr(self, "__review_flow_state_support", support)
return support
def _review_now(self) -> datetime:
provider = getattr(self, "_review_now_provider", None)
if callable(provider):
return provider()
return datetime.now()
return self._review_flow_state_support.review_now()
def _get_review_flow_snapshot(self, user_id: int | None, snapshot_key: str) -> dict | None:
if user_id is None or not hasattr(self, "_get_user_context"):
return None
context = self._get_user_context(user_id)
if not isinstance(context, dict):
return None
snapshots = context.get("flow_snapshots")
if not isinstance(snapshots, dict):
return None
snapshot = snapshots.get(snapshot_key)
return dict(snapshot) if isinstance(snapshot, dict) else None
return self._review_flow_state_support.get_flow_snapshot(
user_id=user_id,
snapshot_key=snapshot_key,
)
def _set_review_flow_snapshot(
self,
@ -39,51 +38,19 @@ class ReviewFlowMixin:
*,
active_task: str | None = None,
) -> None:
if user_id is None or not hasattr(self, "_get_user_context") or not hasattr(self, "_save_user_context"):
return
context = self._get_user_context(user_id)
if not isinstance(context, dict):
return
snapshots = context.get("flow_snapshots")
if not isinstance(snapshots, dict):
snapshots = {}
context["flow_snapshots"] = snapshots
if isinstance(value, dict):
snapshots[snapshot_key] = value
if active_task:
context["active_task"] = active_task
collected_slots = context.get("collected_slots")
if not isinstance(collected_slots, dict):
collected_slots = {}
context["collected_slots"] = collected_slots
payload = value.get("payload")
if isinstance(payload, dict):
collected_slots[active_task] = dict(payload)
else:
snapshots.pop(snapshot_key, None)
if active_task and context.get("active_task") == active_task:
context["active_task"] = None
collected_slots = context.get("collected_slots")
if isinstance(collected_slots, dict) and active_task:
collected_slots.pop(active_task, None)
self._save_user_context(user_id=user_id, context=context)
self._review_flow_state_support.set_flow_snapshot(
user_id=user_id,
snapshot_key=snapshot_key,
value=value,
active_task=active_task,
)
def _get_review_flow_entry(self, bucket: str, user_id: int | None, snapshot_key: str) -> dict | None:
entry = self.state.get_entry(bucket, user_id, expire=True)
if entry:
return entry
snapshot = self._get_review_flow_snapshot(user_id=user_id, snapshot_key=snapshot_key)
if not snapshot:
return None
if snapshot.get("expires_at") and snapshot["expires_at"] < utc_now():
self._set_review_flow_snapshot(user_id=user_id, snapshot_key=snapshot_key, value=None)
return None
self.state.set_entry(bucket, user_id, snapshot)
return snapshot
return self._review_flow_state_support.get_flow_entry(
bucket=bucket,
user_id=user_id,
snapshot_key=snapshot_key,
)
def _set_review_flow_entry(
self,
@ -94,8 +61,8 @@ class ReviewFlowMixin:
*,
active_task: str | None = None,
) -> None:
self.state.set_entry(bucket, user_id, value)
self._set_review_flow_snapshot(
self._review_flow_state_support.set_flow_entry(
bucket=bucket,
user_id=user_id,
snapshot_key=snapshot_key,
value=value,
@ -110,14 +77,12 @@ class ReviewFlowMixin:
*,
active_task: str | None = None,
) -> dict | None:
entry = self.state.pop_entry(bucket, user_id)
self._set_review_flow_snapshot(
return self._review_flow_state_support.pop_flow_entry(
bucket=bucket,
user_id=user_id,
snapshot_key=snapshot_key,
value=None,
active_task=active_task,
)
return entry
def _decision_intent(self, turn_decision: dict | None) -> str:
return str((turn_decision or {}).get("intent") or "").strip().lower()
@ -128,22 +93,14 @@ class ReviewFlowMixin:
payload: dict | None = None,
missing_fields: list[str] | None = None,
) -> None:
if not hasattr(self, "_log_turn_event"):
return
self._log_turn_event(
"review_flow_progress",
review_flow_source=source,
payload_keys=sorted((payload or {}).keys()),
missing_fields=list(missing_fields or []),
self._review_flow_state_support.log_review_flow_source(
source=source,
payload=payload,
missing_fields=missing_fields,
)
def _active_domain(self, user_id: int | None) -> str:
if user_id is None or not hasattr(self, "_get_user_context"):
return "general"
context = self._get_user_context(user_id)
if not isinstance(context, dict):
return "general"
return str(context.get("active_domain") or "general").strip().lower()
return self._review_flow_state_support.active_domain(user_id=user_id)
def _clean_review_model_candidate(self, raw_model: str | None) -> str | None:
text = str(raw_model or "").strip(" ,.;:-")
@ -659,38 +616,13 @@ class ReviewFlowMixin:
)
def _store_last_review_package(self, user_id: int | None, payload: dict | None) -> None:
if user_id is None or not isinstance(payload, dict):
return
# Guarda um pacote reutilizavel do ultimo veiculo informado
# para reduzir repeticao em novos agendamentos.
package = {
"placa": payload.get("placa"),
"modelo": payload.get("modelo"),
"ano": payload.get("ano"),
"km": payload.get("km"),
"revisao_previa_concessionaria": payload.get("revisao_previa_concessionaria"),
}
sanitized = {k: v for k, v in package.items() if v is not None}
required = {"placa", "modelo", "ano", "km", "revisao_previa_concessionaria"}
if not required.issubset(sanitized.keys()):
return
self.state.set_entry(
"last_review_packages",
user_id,
{
"payload": sanitized,
"expires_at": utc_now() + timedelta(minutes=LAST_REVIEW_PACKAGE_TTL_MINUTES),
},
self._review_flow_state_support.store_last_review_package(
user_id=user_id,
payload=payload,
)
def _get_last_review_package(self, user_id: int | None) -> dict | None:
if user_id is None:
return None
cached = self.state.get_entry("last_review_packages", user_id, expire=True)
if not cached:
return None
payload = cached.get("payload")
return dict(payload) if isinstance(payload, dict) else None
return self._review_flow_state_support.get_last_review_package(user_id=user_id)
async def _try_collect_and_schedule_review(
self,
@ -968,3 +900,4 @@ class ReviewFlowMixin:
self._store_last_review_package(user_id=user_id, payload=draft["payload"])
self._log_review_flow_source(source=review_flow_source or "draft", payload=draft["payload"])
return self._fallback_format_tool_result("agendar_revisao", tool_result)

@ -0,0 +1,72 @@
from __future__ import annotations
from datetime import datetime, timedelta
from app.core.time_utils import utc_now
from app.services.flows.flow_state_support import FlowStateSupport
from app.services.orchestration.orchestrator_config import LAST_REVIEW_PACKAGE_TTL_MINUTES
class ReviewFlowStateSupport(FlowStateSupport):
"""Concentra estado e utilitarios de suporte do fluxo de revisao."""
def review_now(self) -> datetime:
provider = getattr(self.service, "_review_now_provider", None)
if callable(provider):
return provider()
return datetime.now()
def log_review_flow_source(
self,
source: str,
payload: dict | None = None,
missing_fields: list[str] | None = None,
) -> None:
if not hasattr(self.service, "_log_turn_event"):
return
self.service._log_turn_event(
"review_flow_progress",
review_flow_source=source,
payload_keys=sorted((payload or {}).keys()),
missing_fields=list(missing_fields or []),
)
def active_domain(self, user_id: int | None) -> str:
if user_id is None or not hasattr(self.service, "_get_user_context"):
return "general"
context = self.service._get_user_context(user_id)
if not isinstance(context, dict):
return "general"
return str(context.get("active_domain") or "general").strip().lower()
def store_last_review_package(self, user_id: int | None, payload: dict | None) -> None:
if user_id is None or not isinstance(payload, dict):
return
package = {
"placa": payload.get("placa"),
"modelo": payload.get("modelo"),
"ano": payload.get("ano"),
"km": payload.get("km"),
"revisao_previa_concessionaria": payload.get("revisao_previa_concessionaria"),
}
sanitized = {key: value for key, value in package.items() if value is not None}
required = {"placa", "modelo", "ano", "km", "revisao_previa_concessionaria"}
if not required.issubset(sanitized.keys()):
return
self.set_state_entry(
"last_review_packages",
user_id,
{
"payload": sanitized,
"expires_at": utc_now() + timedelta(minutes=LAST_REVIEW_PACKAGE_TTL_MINUTES),
},
)
def get_last_review_package(self, user_id: int | None) -> dict | None:
if user_id is None:
return None
cached = self.get_state_entry("last_review_packages", user_id, expire=True)
if not cached:
return None
payload = cached.get("payload")
return dict(payload) if isinstance(payload, dict) else None

@ -0,0 +1,420 @@
from __future__ import annotations
from typing import Any
from fastapi import HTTPException
from app.services.orchestration.orchestrator_config import USER_CONTEXT_TTL_MINUTES
class OrchestratorContextManager:
"""Agrupa a gestao de contexto e efeitos colaterais do turno."""
def __init__(self, service) -> None:
self.service = service
def upsert_user_context(self, user_id: int | None) -> None:
override = self.service.__dict__.get("_upsert_user_context")
if callable(override):
override(user_id)
return
state = getattr(self.service, "state", None)
if state is None:
return
state.upsert_user_context(
user_id=user_id,
ttl_minutes=USER_CONTEXT_TTL_MINUTES,
)
def get_user_context(self, user_id: int | None) -> dict | None:
override = self.service.__dict__.get("_get_user_context")
if callable(override):
return override(user_id)
state = getattr(self.service, "state", None)
if state is None:
return None
return state.get_user_context(user_id)
def save_user_context(self, user_id: int | None, context: dict | None) -> None:
if user_id is None or not isinstance(context, dict):
return
override = self.service.__dict__.get("_save_user_context")
if callable(override):
override(user_id, context)
return
state = getattr(self.service, "state", None)
if state is None:
return
state.save_user_context(user_id=user_id, context=context)
def extract_generic_memory_fields(self, llm_generic_fields: dict | None = None) -> dict:
extracted: dict[str, Any] = {}
llm_fields = llm_generic_fields or {}
normalized_plate = self.service._normalize_plate(llm_fields.get("placa"))
if normalized_plate:
extracted["placa"] = normalized_plate
normalized_cpf = self.service._normalize_cpf(llm_fields.get("cpf"))
if normalized_cpf:
extracted["cpf"] = normalized_cpf
normalized_budget = self.service._normalize_positive_number(llm_fields.get("orcamento_max"))
if normalized_budget:
extracted["orcamento_max"] = int(round(normalized_budget))
normalized_profile = self.service._normalize_vehicle_profile(llm_fields.get("perfil_veiculo"))
if normalized_profile:
extracted["perfil_veiculo"] = normalized_profile
return extracted
def capture_generic_memory(
self,
user_id: int | None,
llm_generic_fields: dict | None = None,
) -> None:
context = self.get_user_context(user_id)
if not context:
return
fields = self.extract_generic_memory_fields(llm_generic_fields=llm_generic_fields)
if fields:
context["generic_memory"].update(fields)
context.setdefault("shared_memory", {}).update(fields)
self.save_user_context(user_id=user_id, context=context)
def capture_tool_result_context(
self,
tool_name: str,
tool_result,
user_id: int | None,
) -> None:
context = self.get_user_context(user_id)
if not context:
return
context["last_tool_result"] = {
"tool_name": tool_name,
"result_type": type(tool_result).__name__,
}
if tool_name == "consultar_frota_aluguel" and isinstance(tool_result, list):
sanitized_rental = self.service._sanitize_rental_results(tool_result[:20])
context["last_rental_results"] = sanitized_rental
self.service._store_pending_rental_selection(
user_id=user_id,
rental_results=sanitized_rental,
)
if sanitized_rental:
context["selected_rental_vehicle"] = None
context["active_domain"] = "rental"
self.save_user_context(user_id=user_id, context=context)
return
if tool_name != "consultar_estoque" or not isinstance(tool_result, list):
self.save_user_context(user_id=user_id, context=context)
return
sanitized = self.service._sanitize_stock_results(tool_result[:5])
context["last_stock_results"] = sanitized
self.service._store_pending_stock_selection(
user_id=user_id,
stock_results=sanitized,
)
if sanitized:
context["selected_vehicle"] = None
self.save_user_context(user_id=user_id, context=context)
def capture_successful_tool_side_effects(
self,
tool_name: str,
arguments: dict | None,
tool_result,
user_id: int | None,
) -> None:
if tool_name == "agendar_revisao" and isinstance(arguments, dict):
self.service._store_last_review_package(user_id=user_id, payload=arguments)
if tool_name in {
"abrir_locacao_aluguel",
"registrar_devolucao_aluguel",
"registrar_pagamento_aluguel",
"registrar_multa_aluguel",
} and isinstance(tool_result, dict):
self.service._store_last_rental_contract(user_id=user_id, payload=tool_result)
self.capture_tool_result_context(
tool_name=tool_name,
tool_result=tool_result,
user_id=user_id,
)
async def maybe_build_stock_suggestion_response(
self,
tool_name: str,
arguments: dict | None,
tool_result,
user_id: int | None,
) -> str | None:
if tool_name != "consultar_estoque" or not isinstance(tool_result, list) or tool_result:
return None
budget = self.service._normalize_positive_number((arguments or {}).get("preco_max"))
if not budget:
return None
relaxed_arguments = dict(arguments or {})
relaxed_arguments["preco_max"] = max(float(budget) * 1.2, float(budget) + 10000.0)
relaxed_arguments["limite"] = min(max(int((arguments or {}).get("limite") or 5), 1), 5)
relaxed_arguments["ordenar_preco"] = "asc"
try:
relaxed_result = await self.service.tool_executor.execute(
"consultar_estoque",
relaxed_arguments,
user_id=user_id,
)
except HTTPException:
return None
if not isinstance(relaxed_result, list):
return None
nearby = []
for item in relaxed_result:
if not isinstance(item, dict):
continue
try:
price = float(item.get("preco") or 0)
except (TypeError, ValueError):
continue
if price > float(budget):
nearby.append(item)
if not nearby:
return None
nearby = [{**item, "budget_relaxed": True} for item in nearby]
self.capture_tool_result_context(
tool_name="consultar_estoque",
tool_result=nearby,
user_id=user_id,
)
budget_label = f"R$ {float(budget):,.0f}".replace(",", ".")
lines = [f"Nao encontrei veiculos ate {budget_label}."]
lines.append("Mas achei algumas opcoes proximas ao seu orcamento:")
for idx, item in enumerate(nearby[:5], start=1):
modelo = str(item.get("modelo") or "N/A")
categoria = str(item.get("categoria") or "N/A")
codigo = item.get("id", "N/A")
preco = f"R$ {float(item.get('preco') or 0):,.2f}".replace(",", "X").replace(".", ",").replace("X", ".")
lines.append(f"{idx}. [{codigo}] {modelo} ({categoria}) - {preco}")
lines.append("Se quiser, responda com o numero da lista ou com o modelo.")
return "\n".join(lines)
def new_tab_memory(self, user_id: int | None) -> dict:
context = self.get_user_context(user_id)
if not context:
return {}
shared = context.get("shared_memory", {})
if not isinstance(shared, dict):
return {}
return dict(shared)
def reset_pending_rental_states(self, user_id: int | None) -> None:
if user_id is None:
return
self.service.state.pop_entry("pending_rental_drafts", user_id)
self.service.state.pop_entry("pending_rental_selections", user_id)
context = self.get_user_context(user_id)
if isinstance(context, dict):
context["last_rental_results"] = []
context["selected_rental_vehicle"] = None
if context.get("active_task") == "rental_create":
context["active_task"] = None
if str(context.get("active_domain") or "").strip().lower() == "rental":
context["active_domain"] = "general"
self.save_user_context(user_id=user_id, context=context)
def reset_pending_review_states(self, user_id: int | None) -> None:
if user_id is None:
return
self.service.state.pop_entry("pending_review_drafts", user_id)
self.service.state.pop_entry("pending_review_confirmations", user_id)
self.service.state.pop_entry("pending_review_management_drafts", user_id)
self.service.state.pop_entry("pending_review_reuse_confirmations", user_id)
context = self.get_user_context(user_id)
if isinstance(context, dict):
snapshots = context.get("flow_snapshots")
if isinstance(snapshots, dict):
snapshots.pop("review_schedule", None)
snapshots.pop("review_confirmation", None)
snapshots.pop("review_management", None)
snapshots.pop("review_reuse_confirmation", None)
collected_slots = context.get("collected_slots")
if isinstance(collected_slots, dict):
collected_slots.pop("review_schedule", None)
collected_slots.pop("review_management", None)
if context.get("active_task") in {"review_schedule", "review_management"}:
context["active_task"] = None
self.save_user_context(user_id=user_id, context=context)
def reset_pending_order_states(self, user_id: int | None) -> None:
if user_id is None:
return
self.service.state.pop_entry("pending_order_drafts", user_id)
self.service.state.pop_entry("pending_cancel_order_drafts", user_id)
self.service.state.pop_entry("pending_stock_selections", user_id)
context = self.get_user_context(user_id)
if isinstance(context, dict):
snapshots = context.get("flow_snapshots")
if isinstance(snapshots, dict):
snapshots.pop("order_create", None)
snapshots.pop("order_cancel", None)
collected_slots = context.get("collected_slots")
if isinstance(collected_slots, dict):
collected_slots.pop("order_create", None)
collected_slots.pop("order_cancel", None)
if context.get("active_task") in {"order_create", "order_cancel"}:
context["active_task"] = None
self.save_user_context(user_id=user_id, context=context)
def clear_user_conversation_state(self, user_id: int | None) -> None:
context = self.get_user_context(user_id)
if not context:
return
self.reset_pending_review_states(user_id=user_id)
self.reset_pending_order_states(user_id=user_id)
self.reset_pending_rental_states(user_id=user_id)
self.service.state.pop_entry("last_review_packages", user_id)
context["active_domain"] = "general"
context["active_task"] = None
context["generic_memory"] = {}
context["shared_memory"] = {}
context["collected_slots"] = {}
context["flow_snapshots"] = {}
context["last_tool_result"] = None
context["order_queue"] = []
context["pending_order_selection"] = None
context["pending_switch"] = None
context["last_stock_results"] = []
context["selected_vehicle"] = None
context["last_rental_results"] = []
context["selected_rental_vehicle"] = None
context.pop("last_rental_contract", None)
self.save_user_context(user_id=user_id, context=context)
def clear_pending_order_navigation(self, user_id: int | None) -> int:
context = self.get_user_context(user_id)
if not context:
return 0
dropped = len(context.get("order_queue", []))
if context.get("pending_switch"):
dropped += 1
if context.get("pending_order_selection"):
pending_orders = context["pending_order_selection"].get("orders") or []
dropped += len(pending_orders)
context["order_queue"] = []
context["pending_switch"] = None
context["pending_order_selection"] = None
self.save_user_context(user_id=user_id, context=context)
return dropped
def cancel_active_flow(self, user_id: int | None) -> str:
context = self.get_user_context(user_id)
if not context:
return "Nao havia contexto ativo para cancelar."
active_domain = context.get("active_domain", "general")
had_flow = self.service._has_open_flow(user_id=user_id, domain=active_domain)
if active_domain == "review":
self.reset_pending_review_states(user_id=user_id)
elif active_domain == "sales":
self.reset_pending_order_states(user_id=user_id)
elif active_domain == "rental":
self.reset_pending_rental_states(user_id=user_id)
context["pending_switch"] = None
self.save_user_context(user_id=user_id, context=context)
if had_flow:
return f"Fluxo atual de {self.service._domain_label(active_domain)} cancelado."
return "Nao havia fluxo em andamento para cancelar."
async def continue_next_order_now(self, user_id: int | None) -> str:
context = self.get_user_context(user_id)
if not context:
return "Nao encontrei contexto ativo para continuar."
if context.get("pending_order_selection"):
return "Ainda preciso que voce escolha qual das duas acoes deseja iniciar primeiro."
pending_switch = context.get("pending_switch")
if isinstance(pending_switch, dict):
queued_message = str(pending_switch.get("queued_message") or "").strip()
if queued_message:
target_domain = str(pending_switch.get("target_domain") or "general")
memory_seed = dict(pending_switch.get("memory_seed") or {})
self.service._apply_domain_switch(user_id=user_id, target_domain=target_domain)
refreshed = self.get_user_context(user_id)
if refreshed is not None:
refreshed["generic_memory"] = memory_seed
self.save_user_context(user_id=user_id, context=refreshed)
transition = self.service._build_next_order_transition(target_domain)
next_response = await self.service.handle_message(queued_message, user_id=user_id)
return f"{transition}\n{next_response}"
next_order = self.service._pop_next_order(user_id=user_id)
if not next_order:
return "Nao ha pedidos pendentes na fila para continuar."
target_domain = str(next_order.get("domain") or "general")
memory_seed = dict(next_order.get("memory_seed") or self.new_tab_memory(user_id=user_id))
self.service._apply_domain_switch(user_id=user_id, target_domain=target_domain)
refreshed = self.get_user_context(user_id)
if refreshed is not None:
refreshed["generic_memory"] = memory_seed
self.save_user_context(user_id=user_id, context=refreshed)
transition = self.service._build_next_order_transition(target_domain)
next_response = await self.service.handle_message(
str(next_order.get("message") or ""),
user_id=user_id,
)
return f"{transition}\n{next_response}"
async def tool_limpar_contexto_conversa(
self,
motivo: str | None = None,
user_id: int | None = None,
) -> dict:
self.clear_user_conversation_state(user_id=user_id)
message = "Contexto da conversa limpo. Podemos recomecar do zero."
if motivo:
message = f"{message}\nMotivo registrado: {motivo.strip()}"
return {"message": message}
async def tool_descartar_pedidos_pendentes(
self,
motivo: str | None = None,
user_id: int | None = None,
) -> dict:
dropped = self.clear_pending_order_navigation(user_id=user_id)
if dropped <= 0:
message = "Nao havia pedidos pendentes na fila para descartar."
elif dropped == 1:
message = "Descartei 1 pedido pendente da fila."
else:
message = f"Descartei {dropped} pedidos pendentes da fila."
if motivo:
message = f"{message}\nMotivo registrado: {motivo.strip()}"
return {"message": message}
async def tool_cancelar_fluxo_atual(
self,
motivo: str | None = None,
user_id: int | None = None,
) -> dict:
message = self.cancel_active_flow(user_id=user_id)
if motivo:
message = f"{message}\nMotivo registrado: {motivo.strip()}"
return {"message": message}
async def tool_continuar_proximo_pedido(self, user_id: int | None = None) -> str:
return await self.continue_next_order_now(user_id=user_id)

@ -0,0 +1,296 @@
from __future__ import annotations
import json
import logging
from time import perf_counter
from typing import Any
from fastapi import HTTPException
from app.core.time_utils import utc_now
from app.services.orchestration.entity_normalizer import EntityNormalizer
from app.services.orchestration.orchestrator_config import (
DETERMINISTIC_RESPONSE_TOOLS,
LOW_VALUE_RESPONSES,
ORCHESTRATION_CONTROL_TOOLS,
)
from app.services.orchestration.prompt_builders import (
build_force_tool_prompt,
build_result_prompt,
build_router_prompt,
)
from app.services.orchestration.sensitive_data import mask_sensitive_payload, mask_sensitive_text
logger = logging.getLogger(__name__)
class OrchestratorExecutionManager:
"""Centraliza instrumentacao, prompts e execucao tecnica de tools."""
def __init__(self, service, logger_instance=None) -> None:
self.service = service
self.logger = logger_instance or logger
def build_router_prompt(self, user_message: str, user_id: int | None) -> str:
conversation_context = self.service._build_context_summary(user_id=user_id)
return build_router_prompt(
user_message=user_message,
user_id=user_id,
conversation_context=conversation_context,
)
def build_force_tool_prompt(self, user_message: str, user_id: int | None) -> str:
conversation_context = self.service._build_context_summary(user_id=user_id)
return build_force_tool_prompt(
user_message=user_message,
user_id=user_id,
conversation_context=conversation_context,
)
def build_result_prompt(
self,
user_message: str,
user_id: int | None,
tool_name: str,
tool_result,
) -> str:
conversation_context = self.service._build_context_summary(user_id=user_id)
return build_result_prompt(
user_message=user_message,
user_id=user_id,
tool_name=tool_name,
tool_result=tool_result,
conversation_context=conversation_context,
)
def capture_turn_decision_trace(self, turn_decision: dict | None) -> None:
trace = getattr(self.service, "_turn_trace", None)
if not isinstance(trace, dict) or not isinstance(turn_decision, dict):
return
trace["intent"] = str(turn_decision.get("intent") or "").strip() or None
trace["domain"] = str(turn_decision.get("domain") or "").strip() or None
trace["action"] = str(turn_decision.get("action") or "").strip() or None
def capture_tool_invocation_trace(self, tool_name: str, arguments: dict | None) -> None:
trace = getattr(self.service, "_turn_trace", None)
if not isinstance(trace, dict):
return
trace["tool_name"] = str(tool_name or "").strip() or None
trace["tool_arguments"] = mask_sensitive_payload(dict(arguments or {})) if isinstance(arguments, dict) else None
def finalize_turn_history(
self,
*,
user_message: str,
assistant_response: str | None,
turn_status: str,
error_detail: str | None = None,
) -> None:
history_service = getattr(self.service, "history_service", None)
if history_service is None:
return
trace = getattr(self.service, "_turn_trace", {}) or {}
history_service.record_turn(
request_id=str(trace.get("request_id") or ""),
conversation_id=str(trace.get("conversation_id") or "anonymous"),
user_id=trace.get("user_id"),
user_message=str(user_message or ""),
assistant_response=assistant_response,
turn_status=str(turn_status or "completed"),
intent=trace.get("intent"),
domain=trace.get("domain"),
action=trace.get("action"),
tool_name=trace.get("tool_name"),
tool_arguments=trace.get("tool_arguments"),
error_detail=error_detail,
started_at=trace.get("started_at"),
completed_at=utc_now(),
elapsed_ms=trace.get("elapsed_ms"),
)
def format_turn_error(self, exc: Exception) -> str:
if isinstance(exc, HTTPException):
detail = exc.detail
if isinstance(detail, dict):
return json.dumps(mask_sensitive_payload(detail), ensure_ascii=True, separators=(",", ":"), default=str)
return str(mask_sensitive_text(str(detail)))
return str(mask_sensitive_text(f"{type(exc).__name__}: {exc}"))
def log_turn_event(self, event: str, **payload) -> None:
trace = getattr(self.service, "_turn_trace", {}) or {}
safe_payload = mask_sensitive_payload(
{
"request_id": trace.get("request_id"),
"conversation_id": trace.get("conversation_id"),
**payload,
}
)
self.logger.info(
"turn_event=%s payload=%s",
event,
safe_payload,
)
async def call_llm_with_trace(self, operation: str, message: str, tools):
started_at = perf_counter()
try:
result = await self.service.llm.generate_response(message=message, tools=tools)
elapsed_ms = round((perf_counter() - started_at) * 1000, 2)
self.log_turn_event(
"llm_completed",
operation=operation,
elapsed_ms=elapsed_ms,
tool_call=bool(result.get("tool_call")),
)
return result
except Exception:
elapsed_ms = round((perf_counter() - started_at) * 1000, 2)
self.log_turn_event(
"llm_failed",
operation=operation,
elapsed_ms=elapsed_ms,
)
raise
def merge_pending_draft_tool_arguments(
self,
tool_name: str,
arguments: dict,
user_id: int | None,
) -> dict:
if user_id is None or not isinstance(arguments, dict):
return dict(arguments or {})
if not hasattr(self.service, "state") or self.service.state is None:
return dict(arguments)
bucket_map = {
"agendar_revisao": "pending_review_drafts",
"realizar_pedido": "pending_order_drafts",
"cancelar_pedido": "pending_cancel_order_drafts",
"cancelar_agendamento_revisao": "pending_review_management_drafts",
"editar_data_revisao": "pending_review_management_drafts",
}
bucket = bucket_map.get(tool_name)
if not bucket:
return dict(arguments)
draft = self.service.state.get_entry(bucket, user_id, expire=True)
if not isinstance(draft, dict):
return dict(arguments)
payload = draft.get("payload")
if not isinstance(payload, dict):
return dict(arguments)
merged_arguments = dict(payload)
merged_arguments.update(arguments)
return merged_arguments
def normalize_tool_invocation(
self,
tool_name: str,
arguments: dict | None,
user_id: int | None,
) -> tuple[str, dict]:
normalizer = getattr(self.service, "normalizer", None)
if normalizer is None:
normalizer = EntityNormalizer()
self.service.normalizer = normalizer
normalized_tool_name = normalizer.normalize_tool_name(tool_name) or str(tool_name or "").strip()
normalized_arguments = normalizer.normalize_tool_arguments(normalized_tool_name, arguments or {})
normalized_arguments = self.merge_pending_draft_tool_arguments(
tool_name=normalized_tool_name,
arguments=normalized_arguments,
user_id=user_id,
)
return normalized_tool_name, normalized_arguments
async def execute_tool_with_trace(self, tool_name: str, arguments: dict, user_id: int | None):
tool_name, arguments = self.normalize_tool_invocation(
tool_name=tool_name,
arguments=arguments,
user_id=user_id,
)
self.capture_tool_invocation_trace(tool_name=tool_name, arguments=arguments)
started_at = perf_counter()
try:
result = await self.service.tool_executor.execute(tool_name, arguments, user_id=user_id)
elapsed_ms = round((perf_counter() - started_at) * 1000, 2)
self.log_turn_event(
"tool_completed",
tool_name=tool_name,
elapsed_ms=elapsed_ms,
arguments=arguments,
result=result,
)
return result
except HTTPException as exc:
elapsed_ms = round((perf_counter() - started_at) * 1000, 2)
self.log_turn_event(
"tool_failed",
tool_name=tool_name,
elapsed_ms=elapsed_ms,
arguments=arguments,
error=self.service.tool_executor.coerce_http_error(exc),
)
raise
async def render_tool_response_with_fallback(
self,
user_message: str,
user_id: int | None,
tool_name: str,
tool_result,
) -> str:
fallback_response = self.fallback_format_tool_result(tool_name, tool_result)
if self.should_use_deterministic_response(tool_name):
self.log_turn_event(
"tool_response_fallback",
tool_name=tool_name,
reason="deterministic_tool",
)
return fallback_response
try:
final_response = await self.call_llm_with_trace(
operation="tool_result_response",
message=self.build_result_prompt(
user_message=user_message,
user_id=user_id,
tool_name=tool_name,
tool_result=tool_result,
),
tools=[],
)
except Exception:
self.log_turn_event(
"tool_response_fallback",
tool_name=tool_name,
reason="llm_failure",
)
return fallback_response
text = (final_response.get("response") or "").strip()
if self.is_low_value_response(text):
self.log_turn_event(
"tool_response_fallback",
tool_name=tool_name,
reason="low_value_response",
)
return fallback_response
return text or fallback_response
def should_use_deterministic_response(self, tool_name: str) -> bool:
return tool_name in DETERMINISTIC_RESPONSE_TOOLS or tool_name in ORCHESTRATION_CONTROL_TOOLS
def is_low_value_response(self, text: str) -> bool:
return text.strip().lower() in LOW_VALUE_RESPONSES
def http_exception_detail(self, exc: HTTPException) -> str:
return self.service.tool_executor.http_exception_detail(exc)
def fallback_format_tool_result(self, tool_name: str, tool_result: Any) -> str:
return self.service.tool_executor.fallback_format_tool_result(
tool_name=tool_name,
tool_result=tool_result,
)

@ -23,6 +23,8 @@ from app.services.orchestration.message_planner import MessagePlanner
from app.services.orchestration.conversation_history_service import ConversationHistoryService
from app.services.orchestration.sensitive_data import mask_sensitive_payload, mask_sensitive_text
from app.services.orchestration.state_repository_factory import get_conversation_state_repository
from app.services.orchestration.orchestrator_context_manager import OrchestratorContextManager
from app.services.orchestration.orchestrator_execution_manager import OrchestratorExecutionManager
from app.services.flows.order_flow import OrderFlowMixin
from app.services.flows.rental_flow import RentalFlowMixin
from app.services.orchestration.prompt_builders import (
@ -55,6 +57,22 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin, RentalFlowMixin):
self.policy = ConversationPolicy(service=self)
self.history_service = ConversationHistoryService()
@property
def _context_manager(self) -> OrchestratorContextManager:
manager = getattr(self, "__context_manager", None)
if manager is None:
manager = OrchestratorContextManager(service=self)
setattr(self, "__context_manager", manager)
return manager
@property
def _execution_manager(self) -> OrchestratorExecutionManager:
manager = getattr(self, "__execution_manager", None)
if manager is None:
manager = OrchestratorExecutionManager(service=self, logger_instance=logger)
setattr(self, "__execution_manager", manager)
return manager
def _build_orchestration_tool_handlers(self) -> dict:
return {
"limpar_contexto_conversa": self._tool_limpar_contexto_conversa,
@ -1346,251 +1364,83 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin, RentalFlowMixin):
# Limpa drafts e selecoes de locacao quando o fluxo termina ou e abortado.
def _reset_pending_rental_states(self, user_id: int | None) -> None:
if user_id is None:
return
self.state.pop_entry("pending_rental_drafts", user_id)
self.state.pop_entry("pending_rental_selections", user_id)
context = self._get_user_context(user_id)
if isinstance(context, dict):
context["last_rental_results"] = []
context["selected_rental_vehicle"] = None
if context.get("active_task") == "rental_create":
context["active_task"] = None
if str(context.get("active_domain") or "").strip().lower() == "rental":
context["active_domain"] = "general"
self._save_user_context(user_id=user_id, context=context)
self._context_manager.reset_pending_rental_states(user_id=user_id)
def _reset_pending_review_states(self, user_id: int | None) -> None:
if user_id is None:
return
self.state.pop_entry("pending_review_drafts", user_id)
self.state.pop_entry("pending_review_confirmations", user_id)
self.state.pop_entry("pending_review_management_drafts", user_id)
self.state.pop_entry("pending_review_reuse_confirmations", user_id)
context = self._get_user_context(user_id)
if isinstance(context, dict):
snapshots = context.get("flow_snapshots")
if isinstance(snapshots, dict):
snapshots.pop("review_schedule", None)
snapshots.pop("review_confirmation", None)
snapshots.pop("review_management", None)
snapshots.pop("review_reuse_confirmation", None)
collected_slots = context.get("collected_slots")
if isinstance(collected_slots, dict):
collected_slots.pop("review_schedule", None)
collected_slots.pop("review_management", None)
if context.get("active_task") in {"review_schedule", "review_management"}:
context["active_task"] = None
self._save_user_context(user_id=user_id, context=context)
self._context_manager.reset_pending_review_states(user_id=user_id)
def _reset_pending_order_states(self, user_id: int | None) -> None:
if user_id is None:
return
self.state.pop_entry("pending_order_drafts", user_id)
self.state.pop_entry("pending_cancel_order_drafts", user_id)
self.state.pop_entry("pending_stock_selections", user_id)
context = self._get_user_context(user_id)
if isinstance(context, dict):
snapshots = context.get("flow_snapshots")
if isinstance(snapshots, dict):
snapshots.pop("order_create", None)
snapshots.pop("order_cancel", None)
collected_slots = context.get("collected_slots")
if isinstance(collected_slots, dict):
collected_slots.pop("order_create", None)
collected_slots.pop("order_cancel", None)
if context.get("active_task") in {"order_create", "order_cancel"}:
context["active_task"] = None
self._save_user_context(user_id=user_id, context=context)
self._context_manager.reset_pending_order_states(user_id=user_id)
def _clear_user_conversation_state(self, user_id: int | None) -> None:
context = self._get_user_context(user_id)
if not context:
return
self._reset_pending_review_states(user_id=user_id)
self._reset_pending_order_states(user_id=user_id)
self._reset_pending_rental_states(user_id=user_id)
self.state.pop_entry("last_review_packages", user_id)
context["active_domain"] = "general"
context["active_task"] = None
context["generic_memory"] = {}
context["shared_memory"] = {}
context["collected_slots"] = {}
context["flow_snapshots"] = {}
context["last_tool_result"] = None
context["order_queue"] = []
context["pending_order_selection"] = None
context["pending_switch"] = None
context["last_stock_results"] = []
context["selected_vehicle"] = None
context["last_rental_results"] = []
context["selected_rental_vehicle"] = None
context.pop("last_rental_contract", None)
self._save_user_context(user_id=user_id, context=context)
self._context_manager.clear_user_conversation_state(user_id=user_id)
def _clear_pending_order_navigation(self, user_id: int | None) -> int:
context = self._get_user_context(user_id)
if not context:
return 0
dropped = len(context.get("order_queue", []))
if context.get("pending_switch"):
dropped += 1
if context.get("pending_order_selection"):
pending_orders = context["pending_order_selection"].get("orders") or []
dropped += len(pending_orders)
context["order_queue"] = []
context["pending_switch"] = None
context["pending_order_selection"] = None
self._save_user_context(user_id=user_id, context=context)
return dropped
return self._context_manager.clear_pending_order_navigation(user_id=user_id)
def _cancel_active_flow(self, user_id: int | None) -> str:
context = self._get_user_context(user_id)
if not context:
return "Nao havia contexto ativo para cancelar."
active_domain = context.get("active_domain", "general")
had_flow = self._has_open_flow(user_id=user_id, domain=active_domain)
if active_domain == "review":
self._reset_pending_review_states(user_id=user_id)
elif active_domain == "sales":
self._reset_pending_order_states(user_id=user_id)
elif active_domain == "rental":
self._reset_pending_rental_states(user_id=user_id)
context["pending_switch"] = None
self._save_user_context(user_id=user_id, context=context)
if had_flow:
return f"Fluxo atual de {self._domain_label(active_domain)} cancelado."
return "Nao havia fluxo em andamento para cancelar."
return self._context_manager.cancel_active_flow(user_id=user_id)
async def _continue_next_order_now(self, user_id: int | None) -> str:
context = self._get_user_context(user_id)
if not context:
return "Nao encontrei contexto ativo para continuar."
if context.get("pending_order_selection"):
return "Ainda preciso que voce escolha qual das duas acoes deseja iniciar primeiro."
pending_switch = context.get("pending_switch")
if isinstance(pending_switch, dict):
queued_message = str(pending_switch.get("queued_message") or "").strip()
if queued_message:
target_domain = str(pending_switch.get("target_domain") or "general")
memory_seed = dict(pending_switch.get("memory_seed") or {})
self._apply_domain_switch(user_id=user_id, target_domain=target_domain)
refreshed = self._get_user_context(user_id)
if refreshed is not None:
refreshed["generic_memory"] = memory_seed
self._save_user_context(user_id=user_id, context=refreshed)
transition = self._build_next_order_transition(target_domain)
next_response = await self.handle_message(queued_message, user_id=user_id)
return f"{transition}\n{next_response}"
next_order = self._pop_next_order(user_id=user_id)
if not next_order:
return "Nao ha pedidos pendentes na fila para continuar."
target_domain = str(next_order.get("domain") or "general")
memory_seed = dict(next_order.get("memory_seed") or self._new_tab_memory(user_id=user_id))
self._apply_domain_switch(user_id=user_id, target_domain=target_domain)
refreshed = self._get_user_context(user_id)
if refreshed is not None:
refreshed["generic_memory"] = memory_seed
self._save_user_context(user_id=user_id, context=refreshed)
transition = self._build_next_order_transition(target_domain)
next_response = await self.handle_message(str(next_order.get("message") or ""), user_id=user_id)
return f"{transition}\n{next_response}"
return await self._context_manager.continue_next_order_now(user_id=user_id)
async def _tool_limpar_contexto_conversa(
self,
motivo: str | None = None,
user_id: int | None = None,
) -> dict:
self._clear_user_conversation_state(user_id=user_id)
message = "Contexto da conversa limpo. Podemos recomecar do zero."
if motivo:
message = f"{message}\nMotivo registrado: {motivo.strip()}"
return {"message": message}
return await self._context_manager.tool_limpar_contexto_conversa(
motivo=motivo,
user_id=user_id,
)
async def _tool_descartar_pedidos_pendentes(
self,
motivo: str | None = None,
user_id: int | None = None,
) -> dict:
dropped = self._clear_pending_order_navigation(user_id=user_id)
if dropped <= 0:
message = "Nao havia pedidos pendentes na fila para descartar."
elif dropped == 1:
message = "Descartei 1 pedido pendente da fila."
else:
message = f"Descartei {dropped} pedidos pendentes da fila."
if motivo:
message = f"{message}\nMotivo registrado: {motivo.strip()}"
return {"message": message}
return await self._context_manager.tool_descartar_pedidos_pendentes(
motivo=motivo,
user_id=user_id,
)
async def _tool_cancelar_fluxo_atual(
self,
motivo: str | None = None,
user_id: int | None = None,
) -> dict:
message = self._cancel_active_flow(user_id=user_id)
if motivo:
message = f"{message}\nMotivo registrado: {motivo.strip()}"
return {"message": message}
return await self._context_manager.tool_cancelar_fluxo_atual(
motivo=motivo,
user_id=user_id,
)
async def _tool_continuar_proximo_pedido(self, user_id: int | None = None) -> str:
return await self._continue_next_order_now(user_id=user_id)
return await self._context_manager.tool_continuar_proximo_pedido(user_id=user_id)
# Nessa funcao eu configuro a memoria volatil do sistema
def _upsert_user_context(self, user_id: int | None) -> None:
self.state.upsert_user_context(user_id=user_id, ttl_minutes=USER_CONTEXT_TTL_MINUTES)
self._context_manager.upsert_user_context(user_id=user_id)
def _get_user_context(self, user_id: int | None) -> dict | None:
return self.state.get_user_context(user_id)
return self._context_manager.get_user_context(user_id)
def _save_user_context(self, user_id: int | None, context: dict | None) -> None:
if user_id is None or not isinstance(context, dict):
return
self.state.save_user_context(user_id=user_id, context=context)
self._context_manager.save_user_context(user_id=user_id, context=context)
def _extract_generic_memory_fields(self, llm_generic_fields: dict | None = None) -> dict:
extracted: dict = {}
llm_fields = llm_generic_fields or {}
normalized_plate = self._normalize_plate(llm_fields.get("placa"))
if normalized_plate:
extracted["placa"] = normalized_plate
normalized_cpf = self._normalize_cpf(llm_fields.get("cpf"))
if normalized_cpf:
extracted["cpf"] = normalized_cpf
normalized_budget = self._normalize_positive_number(llm_fields.get("orcamento_max"))
if normalized_budget:
extracted["orcamento_max"] = int(round(normalized_budget))
normalized_profile = self._normalize_vehicle_profile(llm_fields.get("perfil_veiculo"))
if normalized_profile:
extracted["perfil_veiculo"] = normalized_profile
return extracted
return self._context_manager.extract_generic_memory_fields(
llm_generic_fields=llm_generic_fields,
)
def _capture_generic_memory(
self,
user_id: int | None,
llm_generic_fields: dict | None = None,
) -> None:
context = self._get_user_context(user_id)
if not context:
return
fields = self._extract_generic_memory_fields(llm_generic_fields=llm_generic_fields)
if fields:
# "Memoria generica" e um dict acumulado por usuario.
# Campos novos entram e campos repetidos sobrescrevem valor antigo.
context["generic_memory"].update(fields)
context.setdefault("shared_memory", {}).update(fields)
self._save_user_context(user_id=user_id, context=context)
self._context_manager.capture_generic_memory(
user_id=user_id,
llm_generic_fields=llm_generic_fields,
)
def _capture_tool_result_context(
self,
@ -1598,51 +1448,11 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin, RentalFlowMixin):
tool_result,
user_id: int | None,
) -> None:
context = self._get_user_context(user_id)
if not context:
return
context["last_tool_result"] = {
"tool_name": tool_name,
"result_type": type(tool_result).__name__,
}
if tool_name == "consultar_frota_aluguel" and isinstance(tool_result, list):
sanitized_rental = self._sanitize_rental_results(tool_result[:20])
context["last_rental_results"] = sanitized_rental
self._store_pending_rental_selection(user_id=user_id, rental_results=sanitized_rental)
if sanitized_rental:
context["selected_rental_vehicle"] = None
context["active_domain"] = "rental"
self._save_user_context(user_id=user_id, context=context)
return
if tool_name != "consultar_estoque" or not isinstance(tool_result, list):
self._save_user_context(user_id=user_id, context=context)
return
sanitized: list[dict] = []
for item in tool_result[:5]:
if not isinstance(item, dict):
continue
try:
vehicle_id = int(item.get("id"))
preco = float(item.get("preco") or 0)
except (TypeError, ValueError):
continue
sanitized.append(
{
"id": vehicle_id,
"modelo": str(item.get("modelo") or "").strip(),
"categoria": str(item.get("categoria") or "").strip(),
"preco": preco,
"budget_relaxed": bool(item.get("budget_relaxed", False)),
}
)
context["last_stock_results"] = sanitized
self._store_pending_stock_selection(user_id=user_id, stock_results=sanitized)
if sanitized:
context["selected_vehicle"] = None
self._save_user_context(user_id=user_id, context=context)
self._context_manager.capture_tool_result_context(
tool_name=tool_name,
tool_result=tool_result,
user_id=user_id,
)
def _capture_successful_tool_side_effects(
self,
@ -1651,17 +1461,9 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin, RentalFlowMixin):
tool_result,
user_id: int | None,
) -> None:
if tool_name == "agendar_revisao" and isinstance(arguments, dict):
self._store_last_review_package(user_id=user_id, payload=arguments)
if tool_name in {
"abrir_locacao_aluguel",
"registrar_devolucao_aluguel",
"registrar_pagamento_aluguel",
"registrar_multa_aluguel",
} and isinstance(tool_result, dict):
self._store_last_rental_contract(user_id=user_id, payload=tool_result)
self._capture_tool_result_context(
self._context_manager.capture_successful_tool_side_effects(
tool_name=tool_name,
arguments=arguments,
tool_result=tool_result,
user_id=user_id,
)
@ -1673,72 +1475,15 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin, RentalFlowMixin):
tool_result,
user_id: int | None,
) -> str | None:
if tool_name != "consultar_estoque" or not isinstance(tool_result, list) or tool_result:
return None
budget = self._normalize_positive_number((arguments or {}).get("preco_max"))
if not budget:
return None
relaxed_arguments = dict(arguments or {})
relaxed_arguments["preco_max"] = max(float(budget) * 1.2, float(budget) + 10000.0)
relaxed_arguments["limite"] = min(max(int((arguments or {}).get("limite") or 5), 1), 5)
relaxed_arguments["ordenar_preco"] = "asc"
try:
relaxed_result = await self.tool_executor.execute(
"consultar_estoque",
relaxed_arguments,
user_id=user_id,
)
except HTTPException:
return None
if not isinstance(relaxed_result, list):
return None
nearby = []
for item in relaxed_result:
if not isinstance(item, dict):
continue
try:
price = float(item.get("preco") or 0)
except (TypeError, ValueError):
continue
if price > float(budget):
nearby.append(item)
if not nearby:
return None
nearby = [{**item, "budget_relaxed": True} for item in nearby]
self._capture_tool_result_context(
tool_name="consultar_estoque",
tool_result=nearby,
return await self._context_manager.maybe_build_stock_suggestion_response(
tool_name=tool_name,
arguments=arguments,
tool_result=tool_result,
user_id=user_id,
)
budget_label = f"R$ {float(budget):,.0f}".replace(",", ".")
lines = [f"Nao encontrei veiculos ate {budget_label}."]
lines.append("Mas achei algumas opcoes proximas ao seu orcamento:")
for idx, item in enumerate(nearby[:5], start=1):
modelo = str(item.get("modelo") or "N/A")
categoria = str(item.get("categoria") or "N/A")
codigo = item.get("id", "N/A")
preco = f"R$ {float(item.get('preco') or 0):,.2f}".replace(",", "X").replace(".", ",").replace("X", ".")
lines.append(f"{idx}. [{codigo}] {modelo} ({categoria}) - {preco}")
lines.append("Se quiser, responda com o numero da lista ou com o modelo.")
return "\n".join(lines)
def _new_tab_memory(self, user_id: int | None) -> dict:
context = self._get_user_context(user_id)
if not context:
return {}
shared = context.get("shared_memory", {})
if not isinstance(shared, dict):
return {}
return dict(shared)
return self._context_manager.new_tab_memory(user_id=user_id)
def _empty_extraction_payload(self) -> dict:
return self.normalizer.empty_extraction_payload()
@ -2539,19 +2284,15 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin, RentalFlowMixin):
return self._fallback_format_tool_result("agendar_revisao", tool_result)
def _build_router_prompt(self, user_message: str, user_id: int | None) -> str:
conversation_context = self._build_context_summary(user_id=user_id)
return build_router_prompt(
return self._execution_manager.build_router_prompt(
user_message=user_message,
user_id=user_id,
conversation_context=conversation_context,
)
def _build_force_tool_prompt(self, user_message: str, user_id: int | None) -> str:
conversation_context = self._build_context_summary(user_id=user_id)
return build_force_tool_prompt(
return self._execution_manager.build_force_tool_prompt(
user_message=user_message,
user_id=user_id,
conversation_context=conversation_context,
)
def _build_result_prompt(
@ -2561,29 +2302,21 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin, RentalFlowMixin):
tool_name: str,
tool_result,
) -> str:
conversation_context = self._build_context_summary(user_id=user_id)
return build_result_prompt(
return self._execution_manager.build_result_prompt(
user_message=user_message,
user_id=user_id,
tool_name=tool_name,
tool_result=tool_result,
conversation_context=conversation_context,
)
def _capture_turn_decision_trace(self, turn_decision: dict | None) -> None:
trace = getattr(self, "_turn_trace", None)
if not isinstance(trace, dict) or not isinstance(turn_decision, dict):
return
trace["intent"] = str(turn_decision.get("intent") or "").strip() or None
trace["domain"] = str(turn_decision.get("domain") or "").strip() or None
trace["action"] = str(turn_decision.get("action") or "").strip() or None
self._execution_manager.capture_turn_decision_trace(turn_decision=turn_decision)
def _capture_tool_invocation_trace(self, tool_name: str, arguments: dict | None) -> None:
trace = getattr(self, "_turn_trace", None)
if not isinstance(trace, dict):
return
trace["tool_name"] = str(tool_name or "").strip() or None
trace["tool_arguments"] = mask_sensitive_payload(dict(arguments or {})) if isinstance(arguments, dict) else None
self._execution_manager.capture_tool_invocation_trace(
tool_name=tool_name,
arguments=arguments,
)
def _finalize_turn_history(
self,
@ -2593,72 +2326,25 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin, RentalFlowMixin):
turn_status: str,
error_detail: str | None = None,
) -> None:
history_service = getattr(self, "history_service", None)
if history_service is None:
return
trace = getattr(self, "_turn_trace", {}) or {}
history_service.record_turn(
request_id=str(trace.get("request_id") or ""),
conversation_id=str(trace.get("conversation_id") or "anonymous"),
user_id=trace.get("user_id"),
user_message=str(user_message or ""),
self._execution_manager.finalize_turn_history(
user_message=user_message,
assistant_response=assistant_response,
turn_status=str(turn_status or "completed"),
intent=trace.get("intent"),
domain=trace.get("domain"),
action=trace.get("action"),
tool_name=trace.get("tool_name"),
tool_arguments=trace.get("tool_arguments"),
turn_status=turn_status,
error_detail=error_detail,
started_at=trace.get("started_at"),
completed_at=utc_now(),
elapsed_ms=trace.get("elapsed_ms"),
)
def _format_turn_error(self, exc: Exception) -> str:
if isinstance(exc, HTTPException):
detail = exc.detail
if isinstance(detail, dict):
return json.dumps(mask_sensitive_payload(detail), ensure_ascii=True, separators=(",", ":"), default=str)
return str(mask_sensitive_text(str(detail)))
return str(mask_sensitive_text(f"{type(exc).__name__}: {exc}"))
return self._execution_manager.format_turn_error(exc)
def _log_turn_event(self, event: str, **payload) -> None:
trace = getattr(self, "_turn_trace", {}) or {}
safe_payload = mask_sensitive_payload(
{
"request_id": trace.get("request_id"),
"conversation_id": trace.get("conversation_id"),
**payload,
}
)
logger.info(
"turn_event=%s payload=%s",
event,
safe_payload,
)
self._execution_manager.log_turn_event(event, **payload)
async def _call_llm_with_trace(self, operation: str, message: str, tools):
started_at = perf_counter()
try:
result = await self.llm.generate_response(message=message, tools=tools)
elapsed_ms = round((perf_counter() - started_at) * 1000, 2)
self._log_turn_event(
"llm_completed",
operation=operation,
elapsed_ms=elapsed_ms,
tool_call=bool(result.get("tool_call")),
)
return result
except Exception:
elapsed_ms = round((perf_counter() - started_at) * 1000, 2)
self._log_turn_event(
"llm_failed",
operation=operation,
elapsed_ms=elapsed_ms,
)
raise
return await self._execution_manager.call_llm_with_trace(
operation=operation,
message=message,
tools=tools,
)
def _merge_pending_draft_tool_arguments(
self,
@ -2666,32 +2352,11 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin, RentalFlowMixin):
arguments: dict,
user_id: int | None,
) -> dict:
if user_id is None or not isinstance(arguments, dict):
return dict(arguments or {})
if not hasattr(self, "state") or self.state is None:
return dict(arguments)
bucket_map = {
"agendar_revisao": "pending_review_drafts",
"realizar_pedido": "pending_order_drafts",
"cancelar_pedido": "pending_cancel_order_drafts",
"cancelar_agendamento_revisao": "pending_review_management_drafts",
"editar_data_revisao": "pending_review_management_drafts",
}
bucket = bucket_map.get(tool_name)
if not bucket:
return dict(arguments)
draft = self.state.get_entry(bucket, user_id, expire=True)
if not isinstance(draft, dict):
return dict(arguments)
payload = draft.get("payload")
if not isinstance(payload, dict):
return dict(arguments)
merged_arguments = dict(payload)
merged_arguments.update(arguments)
return merged_arguments
return self._execution_manager.merge_pending_draft_tool_arguments(
tool_name=tool_name,
arguments=arguments,
user_id=user_id,
)
def _normalize_tool_invocation(
self,
@ -2699,48 +2364,18 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin, RentalFlowMixin):
arguments: dict | None,
user_id: int | None,
) -> tuple[str, dict]:
normalizer = getattr(self, "normalizer", None)
if normalizer is None:
normalizer = EntityNormalizer()
self.normalizer = normalizer
normalized_tool_name = normalizer.normalize_tool_name(tool_name) or str(tool_name or "").strip()
normalized_arguments = normalizer.normalize_tool_arguments(normalized_tool_name, arguments or {})
normalized_arguments = self._merge_pending_draft_tool_arguments(
tool_name=normalized_tool_name,
arguments=normalized_arguments,
return self._execution_manager.normalize_tool_invocation(
tool_name=tool_name,
arguments=arguments,
user_id=user_id,
)
return normalized_tool_name, normalized_arguments
async def _execute_tool_with_trace(self, tool_name: str, arguments: dict, user_id: int | None):
tool_name, arguments = self._normalize_tool_invocation(
return await self._execution_manager.execute_tool_with_trace(
tool_name=tool_name,
arguments=arguments,
user_id=user_id,
)
self._capture_tool_invocation_trace(tool_name=tool_name, arguments=arguments)
started_at = perf_counter()
try:
result = await self.tool_executor.execute(tool_name, arguments, user_id=user_id)
elapsed_ms = round((perf_counter() - started_at) * 1000, 2)
self._log_turn_event(
"tool_completed",
tool_name=tool_name,
elapsed_ms=elapsed_ms,
arguments=arguments,
result=result,
)
return result
except HTTPException as exc:
elapsed_ms = round((perf_counter() - started_at) * 1000, 2)
self._log_turn_event(
"tool_failed",
tool_name=tool_name,
elapsed_ms=elapsed_ms,
arguments=arguments,
error=self.tool_executor.coerce_http_error(exc),
)
raise
async def _render_tool_response_with_fallback(
self,
@ -2749,48 +2384,137 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin, RentalFlowMixin):
tool_name: str,
tool_result,
) -> str:
fallback_response = self._fallback_format_tool_result(tool_name, tool_result)
if self._should_use_deterministic_response(tool_name):
self._log_turn_event(
"tool_response_fallback",
tool_name=tool_name,
reason="deterministic_tool",
)
return fallback_response
return await self._execution_manager.render_tool_response_with_fallback(
user_message=user_message,
user_id=user_id,
tool_name=tool_name,
tool_result=tool_result,
)
def _should_use_deterministic_response(self, tool_name: str) -> bool:
return self._execution_manager.should_use_deterministic_response(tool_name)
def _normalize_text(self, text: str) -> str:
return self.normalizer.normalize_text(text)
def _is_low_value_response(self, text: str) -> bool:
return self._execution_manager.is_low_value_response(text)
def _is_affirmative_message(self, text: str) -> bool:
normalized = self._normalize_text(text).strip().rstrip(".!?,;:")
return normalized in {"sim", "pode", "ok", "confirmo", "aceito", "fechado", "pode sim", "tenho", "tenho sim"}
def _is_negative_message(self, text: str) -> bool:
normalized = self._normalize_text(text).strip().rstrip(".!?,;:")
return (
normalized in {"nao", "nao quero", "prefiro outro", "outro horario"}
or normalized.startswith("nao")
)
def _extract_time_only(self, text: str) -> str | None:
return self.normalizer.extract_hhmm_from_text(text)
def _merge_date_with_time(self, base_iso: str, new_time_hhmm: str) -> str | None:
try:
final_response = await self._call_llm_with_trace(
operation="tool_result_response",
message=self._build_result_prompt(
user_message=user_message,
user_id=user_id,
tool_name=tool_name,
tool_result=tool_result,
),
tools=[],
)
base_dt = datetime.fromisoformat((base_iso or "").replace("Z", "+00:00"))
except ValueError:
return None
try:
hour_text, minute_text = new_time_hhmm.split(":")
merged = base_dt.replace(hour=int(hour_text), minute=int(minute_text), second=0, microsecond=0)
return merged.isoformat()
except Exception:
self._log_turn_event(
"tool_response_fallback",
tool_name=tool_name,
reason="llm_failure",
)
return fallback_response
return None
text = (final_response.get("response") or "").strip()
if self._is_low_value_response(text):
self._log_turn_event(
"tool_response_fallback",
tool_name=tool_name,
reason="low_value_response",
def _capture_review_confirmation_suggestion(
self,
tool_name: str,
arguments: dict,
exc: HTTPException,
user_id: int | None,
) -> None:
if tool_name != "agendar_revisao" or user_id is None or exc.status_code != 409:
return
detail = exc.detail if isinstance(exc.detail, dict) else {}
suggested_iso = str(detail.get("suggested_iso") or "").strip()
if not suggested_iso:
return
payload = dict(arguments or {})
if not payload.get("placa"):
return
payload["data_hora"] = suggested_iso
self.state.set_entry("pending_review_confirmations", user_id, {
"payload": payload,
"expires_at": utc_now() + timedelta(minutes=PENDING_REVIEW_TTL_MINUTES),
})
async def _try_confirm_pending_review(
self,
message: str,
user_id: int | None,
extracted_review_fields: dict | None = None,
) -> str | None:
if user_id is None:
return None
pending = self.state.get_entry("pending_review_confirmations", user_id, expire=True)
if not pending:
return None
time_only = self._extract_time_only(message)
if self._is_negative_message(message) or time_only:
extracted = self._normalize_review_fields(extracted_review_fields)
new_data_hora = extracted.get("data_hora")
if not new_data_hora and time_only:
new_data_hora = self._merge_date_with_time(pending["payload"].get("data_hora", ""), time_only)
if not new_data_hora:
self.state.pop_entry("pending_review_confirmations", user_id)
return "Sem problema. Me informe a nova data e hora desejada para a revisao."
payload = dict(pending["payload"])
payload["data_hora"] = new_data_hora
try:
tool_result = await self.tool_executor.execute(
"agendar_revisao",
payload,
user_id=user_id,
)
except HTTPException as exc:
self.state.pop_entry("pending_review_confirmations", user_id)
self._capture_review_confirmation_suggestion(
tool_name="agendar_revisao",
arguments=payload,
exc=exc,
user_id=user_id,
)
return self._http_exception_detail(exc)
self._reset_pending_review_states(user_id=user_id)
self._store_last_review_package(user_id=user_id, payload=payload)
return self._fallback_format_tool_result("agendar_revisao", tool_result)
if not self._is_affirmative_message(message):
return None
try:
tool_result = await self.tool_executor.execute(
"agendar_revisao",
pending["payload"],
user_id=user_id,
)
return fallback_response
return text or fallback_response
except HTTPException as exc:
self.state.pop_entry("pending_review_confirmations", user_id)
return self._http_exception_detail(exc)
self._reset_pending_review_states(user_id=user_id)
self._store_last_review_package(user_id=user_id, payload=pending.get("payload"))
return self._fallback_format_tool_result("agendar_revisao", tool_result)
def _http_exception_detail(self, exc: HTTPException) -> str:
return self.tool_executor.http_exception_detail(exc)
return self._execution_manager.http_exception_detail(exc)
def _fallback_format_tool_result(self, tool_name: str, tool_result) -> str:
return self.tool_executor.fallback_format_tool_result(tool_name=tool_name, tool_result=tool_result)
return self._execution_manager.fallback_format_tool_result(
tool_name=tool_name,
tool_result=tool_result,
)

Loading…
Cancel
Save