diff --git a/app/services/flows/flow_state_support.py b/app/services/flows/flow_state_support.py new file mode 100644 index 0000000..2bd98ec --- /dev/null +++ b/app/services/flows/flow_state_support.py @@ -0,0 +1,131 @@ +from __future__ import annotations + +from app.core.time_utils import utc_now + + +class FlowStateSupport: + """Utilitarios compartilhados para buckets e snapshots de fluxo.""" + + def __init__(self, service) -> None: + self.service = service + + def get_state_repository(self): + return getattr(self.service, "state", None) + + def get_state_entry(self, bucket: str, user_id: int | None, *, expire: bool = False): + state = self.get_state_repository() + if state is None or not hasattr(state, "get_entry"): + return None + return state.get_entry(bucket, user_id, expire=expire) + + def set_state_entry(self, bucket: str, user_id: int | None, value) -> None: + state = self.get_state_repository() + if state is None or not hasattr(state, "set_entry"): + return + state.set_entry(bucket, user_id, value) + + def pop_state_entry(self, bucket: str, user_id: int | None): + state = self.get_state_repository() + if state is None or not hasattr(state, "pop_entry"): + return None + return state.pop_entry(bucket, user_id) + + def get_flow_snapshot(self, user_id: int | None, snapshot_key: str) -> dict | None: + if user_id is None or not hasattr(self.service, "_get_user_context"): + return None + context = self.service._get_user_context(user_id) + if not isinstance(context, dict): + return None + snapshots = context.get("flow_snapshots") + if not isinstance(snapshots, dict): + return None + snapshot = snapshots.get(snapshot_key) + return dict(snapshot) if isinstance(snapshot, dict) else None + + def set_flow_snapshot( + self, + user_id: int | None, + snapshot_key: str, + value: dict | None, + *, + active_task: str | None = None, + ) -> None: + if user_id is None or not hasattr(self.service, "_get_user_context") or not hasattr(self.service, "_save_user_context"): + return + context = self.service._get_user_context(user_id) + if not isinstance(context, dict): + return + snapshots = context.get("flow_snapshots") + if not isinstance(snapshots, dict): + snapshots = {} + context["flow_snapshots"] = snapshots + + if isinstance(value, dict): + snapshots[snapshot_key] = value + if active_task: + context["active_task"] = active_task + collected_slots = context.get("collected_slots") + if not isinstance(collected_slots, dict): + collected_slots = {} + context["collected_slots"] = collected_slots + payload = value.get("payload") + if isinstance(payload, dict): + collected_slots[active_task] = dict(payload) + else: + snapshots.pop(snapshot_key, None) + if active_task and context.get("active_task") == active_task: + context["active_task"] = None + collected_slots = context.get("collected_slots") + if isinstance(collected_slots, dict) and active_task: + collected_slots.pop(active_task, None) + + self.service._save_user_context(user_id=user_id, context=context) + + def get_flow_entry(self, bucket: str, user_id: int | None, snapshot_key: str) -> dict | None: + entry = self.get_state_entry(bucket, user_id, expire=True) + if entry: + return entry + + snapshot = self.get_flow_snapshot(user_id=user_id, snapshot_key=snapshot_key) + if not snapshot: + return None + if snapshot.get("expires_at") and snapshot["expires_at"] < utc_now(): + self.set_flow_snapshot(user_id=user_id, snapshot_key=snapshot_key, value=None) + return None + + self.set_state_entry(bucket, user_id, snapshot) + return snapshot + + def set_flow_entry( + self, + bucket: str, + user_id: int | None, + snapshot_key: str, + value: dict, + *, + active_task: str | None = None, + ) -> None: + self.set_state_entry(bucket, user_id, value) + self.set_flow_snapshot( + user_id=user_id, + snapshot_key=snapshot_key, + value=value, + active_task=active_task, + ) + + def pop_flow_entry( + self, + bucket: str, + user_id: int | None, + snapshot_key: str, + *, + active_task: str | None = None, + ) -> dict | None: + entry = self.pop_state_entry(bucket, user_id) + self.set_flow_snapshot( + user_id=user_id, + snapshot_key=snapshot_key, + value=None, + active_task=active_task, + ) + return entry diff --git a/app/services/flows/order_flow.py b/app/services/flows/order_flow.py index 3975037..237ae35 100644 --- a/app/services/flows/order_flow.py +++ b/app/services/flows/order_flow.py @@ -15,43 +15,28 @@ from app.services.orchestration.orchestrator_config import ( PENDING_ORDER_SELECTION_TTL_MINUTES, ) from app.services.user.mock_customer_service import hydrate_mock_customer_from_cpf +from app.services.flows.order_flow_support import OrderFlowStateSupport # Esse mixin cuida dos fluxos de venda: # criacao de pedido, selecao de veiculo e cancelamento. class OrderFlowMixin: + @property + def _order_flow_state_support(self) -> OrderFlowStateSupport: + support = getattr(self, "__order_flow_state_support", None) + if support is None: + support = OrderFlowStateSupport(self) + setattr(self, "__order_flow_state_support", support) + return support + def _sanitize_stock_results(self, stock_results: list[dict] | None) -> list[dict]: - sanitized: list[dict] = [] - for item in stock_results or []: - if not isinstance(item, dict): - continue - try: - vehicle_id = int(item.get("id")) - preco = float(item.get("preco") or 0) - except (TypeError, ValueError): - continue - sanitized.append( - { - "id": vehicle_id, - "modelo": str(item.get("modelo") or "").strip(), - "categoria": str(item.get("categoria") or "").strip(), - "preco": preco, - "budget_relaxed": bool(item.get("budget_relaxed", False)), - } - ) - return sanitized + return self._order_flow_state_support.sanitize_stock_results(stock_results) def _get_order_flow_snapshot(self, user_id: int | None, snapshot_key: str) -> dict | None: - if user_id is None or not hasattr(self, "_get_user_context"): - return None - context = self._get_user_context(user_id) - if not isinstance(context, dict): - return None - snapshots = context.get("flow_snapshots") - if not isinstance(snapshots, dict): - return None - snapshot = snapshots.get(snapshot_key) - return dict(snapshot) if isinstance(snapshot, dict) else None + return self._order_flow_state_support.get_flow_snapshot( + user_id=user_id, + snapshot_key=snapshot_key, + ) def _set_order_flow_snapshot( self, @@ -61,51 +46,19 @@ class OrderFlowMixin: *, active_task: str | None = None, ) -> None: - if user_id is None or not hasattr(self, "_get_user_context") or not hasattr(self, "_save_user_context"): - return - context = self._get_user_context(user_id) - if not isinstance(context, dict): - return - snapshots = context.get("flow_snapshots") - if not isinstance(snapshots, dict): - snapshots = {} - context["flow_snapshots"] = snapshots - - if isinstance(value, dict): - snapshots[snapshot_key] = value - if active_task: - context["active_task"] = active_task - collected_slots = context.get("collected_slots") - if not isinstance(collected_slots, dict): - collected_slots = {} - context["collected_slots"] = collected_slots - payload = value.get("payload") - if isinstance(payload, dict): - collected_slots[active_task] = dict(payload) - else: - snapshots.pop(snapshot_key, None) - if active_task and context.get("active_task") == active_task: - context["active_task"] = None - collected_slots = context.get("collected_slots") - if isinstance(collected_slots, dict) and active_task: - collected_slots.pop(active_task, None) - - self._save_user_context(user_id=user_id, context=context) + self._order_flow_state_support.set_flow_snapshot( + user_id=user_id, + snapshot_key=snapshot_key, + value=value, + active_task=active_task, + ) def _get_order_flow_entry(self, bucket: str, user_id: int | None, snapshot_key: str) -> dict | None: - entry = self.state.get_entry(bucket, user_id, expire=True) - if entry: - return entry - - snapshot = self._get_order_flow_snapshot(user_id=user_id, snapshot_key=snapshot_key) - if not snapshot: - return None - if snapshot.get("expires_at") and snapshot["expires_at"] < utc_now(): - self._set_order_flow_snapshot(user_id=user_id, snapshot_key=snapshot_key, value=None) - return None - - self.state.set_entry(bucket, user_id, snapshot) - return snapshot + return self._order_flow_state_support.get_flow_entry( + bucket=bucket, + user_id=user_id, + snapshot_key=snapshot_key, + ) def _set_order_flow_entry( self, @@ -116,8 +69,8 @@ class OrderFlowMixin: *, active_task: str | None = None, ) -> None: - self.state.set_entry(bucket, user_id, value) - self._set_order_flow_snapshot( + self._order_flow_state_support.set_flow_entry( + bucket=bucket, user_id=user_id, snapshot_key=snapshot_key, value=value, @@ -132,14 +85,12 @@ class OrderFlowMixin: *, active_task: str | None = None, ) -> dict | None: - entry = self.state.pop_entry(bucket, user_id) - self._set_order_flow_snapshot( + return self._order_flow_state_support.pop_flow_entry( + bucket=bucket, user_id=user_id, snapshot_key=snapshot_key, - value=None, active_task=active_task, ) - return entry def _decision_intent(self, turn_decision: dict | None) -> str: return str((turn_decision or {}).get("intent") or "").strip().lower() @@ -275,89 +226,40 @@ class OrderFlowMixin: db.close() def _get_last_stock_results(self, user_id: int | None) -> list[dict]: - pending_selection = self.state.get_entry("pending_stock_selections", user_id, expire=True) - if isinstance(pending_selection, dict): - payload = pending_selection.get("payload") - if isinstance(payload, list): - sanitized = self._sanitize_stock_results(payload) - if sanitized: - return sanitized - context = self._get_user_context(user_id) - if not context: - return [] - stock_results = context.get("last_stock_results") or [] - return self._sanitize_stock_results(stock_results if isinstance(stock_results, list) else []) + return self._order_flow_state_support.get_last_stock_results(user_id=user_id) def _store_pending_stock_selection(self, user_id: int | None, stock_results: list[dict] | None) -> None: - if user_id is None: - return - sanitized = self._sanitize_stock_results(stock_results) - if not sanitized: - self.state.pop_entry("pending_stock_selections", user_id) - return - self.state.set_entry( - "pending_stock_selections", - user_id, - { - "payload": sanitized, - "expires_at": utc_now() + timedelta(minutes=PENDING_ORDER_SELECTION_TTL_MINUTES), - }, + self._order_flow_state_support.store_pending_stock_selection( + user_id=user_id, + stock_results=stock_results, ) def _get_selected_vehicle(self, user_id: int | None) -> dict | None: - context = self._get_user_context(user_id) - if not context: - return None - selected_vehicle = context.get("selected_vehicle") - return dict(selected_vehicle) if isinstance(selected_vehicle, dict) else None + return self._order_flow_state_support.get_selected_vehicle(user_id=user_id) def _get_pending_single_vehicle_confirmation(self, user_id: int | None) -> dict | None: - context = self._get_user_context(user_id) - if not context: - return None - pending_vehicle = context.get("pending_single_vehicle_confirmation") - return dict(pending_vehicle) if isinstance(pending_vehicle, dict) else None + return self._order_flow_state_support.get_pending_single_vehicle_confirmation(user_id=user_id) def _remember_stock_results(self, user_id: int | None, stock_results: list[dict] | None) -> None: - context = self._get_user_context(user_id) - if not context: - return - sanitized = self._sanitize_stock_results(stock_results) - context["last_stock_results"] = sanitized - self._store_pending_stock_selection(user_id=user_id, stock_results=sanitized) - if sanitized: - context["selected_vehicle"] = None - context["pending_single_vehicle_confirmation"] = None - self._save_user_context(user_id=user_id, context=context) + self._order_flow_state_support.remember_stock_results( + user_id=user_id, + stock_results=stock_results, + ) def _store_selected_vehicle(self, user_id: int | None, vehicle: dict | None) -> None: - if user_id is None: - return - context = self._get_user_context(user_id) - if not context: - return - context["selected_vehicle"] = dict(vehicle) if isinstance(vehicle, dict) else None - context["pending_single_vehicle_confirmation"] = None - self.state.pop_entry("pending_stock_selections", user_id) - self._save_user_context(user_id=user_id, context=context) + self._order_flow_state_support.store_selected_vehicle( + user_id=user_id, + vehicle=vehicle, + ) def _store_pending_single_vehicle_confirmation(self, user_id: int | None, vehicle: dict | None) -> None: - if user_id is None: - return - context = self._get_user_context(user_id) - if not context: - return - context["pending_single_vehicle_confirmation"] = dict(vehicle) if isinstance(vehicle, dict) else None - self._save_user_context(user_id=user_id, context=context) + self._order_flow_state_support.store_pending_single_vehicle_confirmation( + user_id=user_id, + vehicle=vehicle, + ) def _clear_pending_single_vehicle_confirmation(self, user_id: int | None) -> None: - if user_id is None: - return - context = self._get_user_context(user_id) - if not isinstance(context, dict): - return - context["pending_single_vehicle_confirmation"] = None - self._save_user_context(user_id=user_id, context=context) + self._order_flow_state_support.clear_pending_single_vehicle_confirmation(user_id=user_id) def _vehicle_to_payload(self, vehicle: dict) -> dict: return { @@ -1192,3 +1094,4 @@ class OrderFlowMixin: return self._fallback_format_tool_result("cancelar_pedido", tool_result) + diff --git a/app/services/flows/order_flow_support.py b/app/services/flows/order_flow_support.py new file mode 100644 index 0000000..953a822 --- /dev/null +++ b/app/services/flows/order_flow_support.py @@ -0,0 +1,117 @@ +from __future__ import annotations + +from datetime import timedelta + +from app.core.time_utils import utc_now +from app.services.flows.flow_state_support import FlowStateSupport +from app.services.orchestration.orchestrator_config import PENDING_ORDER_SELECTION_TTL_MINUTES + + +class OrderFlowStateSupport(FlowStateSupport): + """Concentra estado, snapshots e selecoes do fluxo de vendas.""" + + def sanitize_stock_results(self, stock_results: list[dict] | None) -> list[dict]: + sanitized: list[dict] = [] + for item in stock_results or []: + if not isinstance(item, dict): + continue + try: + vehicle_id = int(item.get("id")) + preco = float(item.get("preco") or 0) + except (TypeError, ValueError): + continue + sanitized.append( + { + "id": vehicle_id, + "modelo": str(item.get("modelo") or "").strip(), + "categoria": str(item.get("categoria") or "").strip(), + "preco": preco, + "budget_relaxed": bool(item.get("budget_relaxed", False)), + } + ) + return sanitized + + def get_last_stock_results(self, user_id: int | None) -> list[dict]: + pending_selection = self.get_state_entry("pending_stock_selections", user_id, expire=True) + if isinstance(pending_selection, dict): + payload = pending_selection.get("payload") + if isinstance(payload, list): + sanitized = self.sanitize_stock_results(payload) + if sanitized: + return sanitized + context = self.service._get_user_context(user_id) + if not context: + return [] + stock_results = context.get("last_stock_results") or [] + return self.sanitize_stock_results(stock_results if isinstance(stock_results, list) else []) + + def store_pending_stock_selection(self, user_id: int | None, stock_results: list[dict] | None) -> None: + if user_id is None: + return + sanitized = self.sanitize_stock_results(stock_results) + if not sanitized: + self.pop_state_entry("pending_stock_selections", user_id) + return + self.set_state_entry( + "pending_stock_selections", + user_id, + { + "payload": sanitized, + "expires_at": utc_now() + timedelta(minutes=PENDING_ORDER_SELECTION_TTL_MINUTES), + }, + ) + + def get_selected_vehicle(self, user_id: int | None) -> dict | None: + context = self.service._get_user_context(user_id) + if not context: + return None + selected_vehicle = context.get("selected_vehicle") + return dict(selected_vehicle) if isinstance(selected_vehicle, dict) else None + + def get_pending_single_vehicle_confirmation(self, user_id: int | None) -> dict | None: + context = self.service._get_user_context(user_id) + if not context: + return None + pending_vehicle = context.get("pending_single_vehicle_confirmation") + return dict(pending_vehicle) if isinstance(pending_vehicle, dict) else None + + def remember_stock_results(self, user_id: int | None, stock_results: list[dict] | None) -> None: + context = self.service._get_user_context(user_id) + if not context: + return + sanitized = self.sanitize_stock_results(stock_results) + context["last_stock_results"] = sanitized + self.store_pending_stock_selection(user_id=user_id, stock_results=sanitized) + if sanitized: + context["selected_vehicle"] = None + context["pending_single_vehicle_confirmation"] = None + self.service._save_user_context(user_id=user_id, context=context) + + def store_selected_vehicle(self, user_id: int | None, vehicle: dict | None) -> None: + if user_id is None: + return + context = self.service._get_user_context(user_id) + if not context: + return + context["selected_vehicle"] = dict(vehicle) if isinstance(vehicle, dict) else None + context["pending_single_vehicle_confirmation"] = None + self.pop_state_entry("pending_stock_selections", user_id) + self.service._save_user_context(user_id=user_id, context=context) + + def store_pending_single_vehicle_confirmation(self, user_id: int | None, vehicle: dict | None) -> None: + if user_id is None: + return + context = self.service._get_user_context(user_id) + if not context: + return + context["pending_single_vehicle_confirmation"] = dict(vehicle) if isinstance(vehicle, dict) else None + self.service._save_user_context(user_id=user_id, context=context) + + def clear_pending_single_vehicle_confirmation(self, user_id: int | None) -> None: + if user_id is None: + return + context = self.service._get_user_context(user_id) + if not isinstance(context, dict): + return + context["pending_single_vehicle_confirmation"] = None + self.service._save_user_context(user_id=user_id, context=context) diff --git a/app/services/flows/rental_flow.py b/app/services/flows/rental_flow.py index af5988d..4fa0122 100644 --- a/app/services/flows/rental_flow.py +++ b/app/services/flows/rental_flow.py @@ -7,185 +7,78 @@ from app.core.time_utils import utc_now from app.services.orchestration import technical_normalizer from app.services.orchestration.orchestrator_config import ( PENDING_RENTAL_DRAFT_TTL_MINUTES, - PENDING_RENTAL_SELECTION_TTL_MINUTES, RENTAL_REQUIRED_FIELDS, ) +from app.services.flows.rental_flow_support import RentalFlowStateSupport class RentalFlowMixin: + @property + def _rental_flow_state_support(self) -> RentalFlowStateSupport: + support = getattr(self, "__rental_flow_state_support", None) + if support is None: + support = RentalFlowStateSupport(self) + setattr(self, "__rental_flow_state_support", support) + return support + # Sanitiza resultados da frota antes de guardar no contexto. def _sanitize_rental_results(self, rental_results: list[dict] | None) -> list[dict]: - sanitized: list[dict] = [] - for item in rental_results or []: - if not isinstance(item, dict): - continue - try: - rental_vehicle_id = int(item.get("id")) - valor_diaria = float(item.get("valor_diaria") or 0) - ano = int(item.get("ano")) if item.get("ano") is not None else None - except (TypeError, ValueError): - continue - placa = technical_normalizer.normalize_plate(item.get("placa")) - if not placa: - continue - sanitized.append( - { - "id": rental_vehicle_id, - "placa": placa, - "modelo": str(item.get("modelo") or "").strip(), - "categoria": str(item.get("categoria") or "").strip().lower(), - "ano": ano, - "valor_diaria": valor_diaria, - "status": str(item.get("status") or "").strip().lower() or "disponivel", - } - ) - return sanitized + return self._rental_flow_state_support.sanitize_rental_results(rental_results) # Marca locacao como dominio ativo na conversa do usuario. def _mark_rental_flow_active(self, user_id: int | None, *, active_task: str | None = None) -> None: - if user_id is None: - return - context = self._get_user_context(user_id) - if not isinstance(context, dict): - return - context["active_domain"] = "rental" - if active_task is not None: - context["active_task"] = active_task - self._save_user_context(user_id=user_id, context=context) + self._rental_flow_state_support.mark_rental_flow_active( + user_id=user_id, + active_task=active_task, + ) # Recupera a ultima lista de veiculos disponiveis para locacao. def _get_last_rental_results(self, user_id: int | None) -> list[dict]: - pending_selection = self.state.get_entry("pending_rental_selections", user_id, expire=True) - if isinstance(pending_selection, dict): - payload = pending_selection.get("payload") - if isinstance(payload, list): - sanitized = self._sanitize_rental_results(payload) - if sanitized: - return sanitized - context = self._get_user_context(user_id) - if not isinstance(context, dict): - return [] - rental_results = context.get("last_rental_results") or [] - return self._sanitize_rental_results(rental_results if isinstance(rental_results, list) else []) + return self._rental_flow_state_support.get_last_rental_results(user_id=user_id) # Guarda a lista atual para permitir selecao do veiculo em mensagens seguintes. def _store_pending_rental_selection(self, user_id: int | None, rental_results: list[dict] | None) -> None: - if user_id is None: - return - sanitized = self._sanitize_rental_results(rental_results) - if not sanitized: - self.state.pop_entry("pending_rental_selections", user_id) - return - self.state.set_entry( - "pending_rental_selections", - user_id, - { - "payload": sanitized, - "expires_at": utc_now() + timedelta(minutes=PENDING_RENTAL_SELECTION_TTL_MINUTES), - }, + self._rental_flow_state_support.store_pending_rental_selection( + user_id=user_id, + rental_results=rental_results, ) # Le o veiculo de locacao escolhido que ficou salvo no contexto. def _get_selected_rental_vehicle(self, user_id: int | None) -> dict | None: - context = self._get_user_context(user_id) - if not isinstance(context, dict): - return None - selected_vehicle = context.get("selected_rental_vehicle") - return dict(selected_vehicle) if isinstance(selected_vehicle, dict) else None + return self._rental_flow_state_support.get_selected_rental_vehicle(user_id=user_id) # Filtra o payload do contrato para manter so dados uteis no contexto. def _sanitize_rental_contract_snapshot(self, payload) -> dict | None: - if not isinstance(payload, dict): - return None - - contract_number = str(payload.get("contrato_numero") or "").strip().upper() - plate = technical_normalizer.normalize_plate(payload.get("placa")) - if not contract_number and not plate: - return None - - snapshot: dict = {} - if contract_number: - snapshot["contrato_numero"] = contract_number - if plate: - snapshot["placa"] = plate - - for field_name in ( - "modelo_veiculo", - "categoria", - "status", - "status_veiculo", - "data_inicio", - "data_fim_prevista", - "data_devolucao", - ): - value = str(payload.get(field_name) or "").strip() - if value: - snapshot[field_name] = value - - for field_name in ("valor_diaria", "valor_previsto", "valor_final"): - number = technical_normalizer.normalize_positive_number(payload.get(field_name)) - if number is not None: - snapshot[field_name] = float(number) - - return snapshot + return self._rental_flow_state_support.sanitize_rental_contract_snapshot(payload) # Recupera o ultimo contrato de locacao lembrado para o usuario. def _get_last_rental_contract(self, user_id: int | None) -> dict | None: - context = self._get_user_context(user_id) - if not isinstance(context, dict): - return None - contract = context.get("last_rental_contract") - return dict(contract) if isinstance(contract, dict) else None + return self._rental_flow_state_support.get_last_rental_contract(user_id=user_id) # Atualiza o ultimo contrato de locacao salvo no contexto. def _store_last_rental_contract(self, user_id: int | None, payload) -> None: - if user_id is None: - return - context = self._get_user_context(user_id) - if not isinstance(context, dict): - return - sanitized = self._sanitize_rental_contract_snapshot(payload) - if sanitized is None: - context.pop("last_rental_contract", None) - else: - context["last_rental_contract"] = sanitized - self._save_user_context(user_id=user_id, context=context) - + self._rental_flow_state_support.store_last_rental_contract( + user_id=user_id, + payload=payload, + ) # Persiste a ultima consulta de frota para reuso no fluxo incremental. def _remember_rental_results(self, user_id: int | None, rental_results: list[dict] | None) -> None: - context = self._get_user_context(user_id) - if not isinstance(context, dict): - return - sanitized = self._sanitize_rental_results(rental_results) - context["last_rental_results"] = sanitized - self._store_pending_rental_selection(user_id=user_id, rental_results=sanitized) - if sanitized: - context["selected_rental_vehicle"] = None - context["active_domain"] = "rental" - self._save_user_context(user_id=user_id, context=context) + self._rental_flow_state_support.remember_rental_results( + user_id=user_id, + rental_results=rental_results, + ) # Salva o veiculo escolhido e encerra a etapa de selecao pendente. def _store_selected_rental_vehicle(self, user_id: int | None, vehicle: dict | None) -> None: - if user_id is None: - return - context = self._get_user_context(user_id) - if not isinstance(context, dict): - return - context["selected_rental_vehicle"] = dict(vehicle) if isinstance(vehicle, dict) else None - context["active_domain"] = "rental" - self.state.pop_entry("pending_rental_selections", user_id) - self._save_user_context(user_id=user_id, context=context) + self._rental_flow_state_support.store_selected_rental_vehicle( + user_id=user_id, + vehicle=vehicle, + ) # Converte um veiculo selecionado no payload esperado pela abertura da locacao. def _rental_vehicle_to_payload(self, vehicle: dict) -> dict: - return { - "rental_vehicle_id": int(vehicle["id"]), - "placa": str(vehicle["placa"]), - "modelo_veiculo": str(vehicle["modelo"]), - "categoria": str(vehicle.get("categoria") or ""), - "valor_diaria": round(float(vehicle.get("valor_diaria") or 0), 2), - } + return self._rental_flow_state_support.rental_vehicle_to_payload(vehicle) # Extrai a categoria de locacao mencionada livremente pelo usuario. def _extract_rental_category_from_text(self, text: str) -> str | None: @@ -720,3 +613,4 @@ class RentalFlowMixin: self._store_last_rental_contract(user_id=user_id, payload=tool_result) self._reset_pending_rental_states(user_id=user_id) return self._fallback_format_tool_result("abrir_locacao_aluguel", tool_result) + diff --git a/app/services/flows/rental_flow_support.py b/app/services/flows/rental_flow_support.py new file mode 100644 index 0000000..2b924e0 --- /dev/null +++ b/app/services/flows/rental_flow_support.py @@ -0,0 +1,174 @@ +from __future__ import annotations + +from datetime import timedelta + +from app.core.time_utils import utc_now +from app.services.flows.flow_state_support import FlowStateSupport +from app.services.orchestration import technical_normalizer +from app.services.orchestration.orchestrator_config import PENDING_RENTAL_SELECTION_TTL_MINUTES + + +class RentalFlowStateSupport(FlowStateSupport): + """Concentra estado e contexto incremental do fluxo de locacao.""" + + def sanitize_rental_results(self, rental_results: list[dict] | None) -> list[dict]: + sanitized: list[dict] = [] + for item in rental_results or []: + if not isinstance(item, dict): + continue + try: + rental_vehicle_id = int(item.get("id")) + valor_diaria = float(item.get("valor_diaria") or 0) + ano = int(item.get("ano")) if item.get("ano") is not None else None + except (TypeError, ValueError): + continue + placa = technical_normalizer.normalize_plate(item.get("placa")) + if not placa: + continue + sanitized.append( + { + "id": rental_vehicle_id, + "placa": placa, + "modelo": str(item.get("modelo") or "").strip(), + "categoria": str(item.get("categoria") or "").strip().lower(), + "ano": ano, + "valor_diaria": valor_diaria, + "status": str(item.get("status") or "").strip().lower() or "disponivel", + } + ) + return sanitized + + def mark_rental_flow_active(self, user_id: int | None, *, active_task: str | None = None) -> None: + if user_id is None: + return + context = self.service._get_user_context(user_id) + if not isinstance(context, dict): + return + context["active_domain"] = "rental" + if active_task is not None: + context["active_task"] = active_task + self.service._save_user_context(user_id=user_id, context=context) + + def get_last_rental_results(self, user_id: int | None) -> list[dict]: + pending_selection = self.get_state_entry("pending_rental_selections", user_id, expire=True) + if isinstance(pending_selection, dict): + payload = pending_selection.get("payload") + if isinstance(payload, list): + sanitized = self.sanitize_rental_results(payload) + if sanitized: + return sanitized + context = self.service._get_user_context(user_id) + if not isinstance(context, dict): + return [] + rental_results = context.get("last_rental_results") or [] + return self.sanitize_rental_results(rental_results if isinstance(rental_results, list) else []) + + def store_pending_rental_selection(self, user_id: int | None, rental_results: list[dict] | None) -> None: + if user_id is None: + return + sanitized = self.sanitize_rental_results(rental_results) + if not sanitized: + self.pop_state_entry("pending_rental_selections", user_id) + return + self.set_state_entry( + "pending_rental_selections", + user_id, + { + "payload": sanitized, + "expires_at": utc_now() + timedelta(minutes=PENDING_RENTAL_SELECTION_TTL_MINUTES), + }, + ) + + def get_selected_rental_vehicle(self, user_id: int | None) -> dict | None: + context = self.service._get_user_context(user_id) + if not isinstance(context, dict): + return None + selected_vehicle = context.get("selected_rental_vehicle") + return dict(selected_vehicle) if isinstance(selected_vehicle, dict) else None + + def sanitize_rental_contract_snapshot(self, payload) -> dict | None: + if not isinstance(payload, dict): + return None + + contract_number = str(payload.get("contrato_numero") or "").strip().upper() + plate = technical_normalizer.normalize_plate(payload.get("placa")) + if not contract_number and not plate: + return None + + snapshot: dict = {} + if contract_number: + snapshot["contrato_numero"] = contract_number + if plate: + snapshot["placa"] = plate + + for field_name in ( + "modelo_veiculo", + "categoria", + "status", + "status_veiculo", + "data_inicio", + "data_fim_prevista", + "data_devolucao", + ): + value = str(payload.get(field_name) or "").strip() + if value: + snapshot[field_name] = value + + for field_name in ("valor_diaria", "valor_previsto", "valor_final"): + number = technical_normalizer.normalize_positive_number(payload.get(field_name)) + if number is not None: + snapshot[field_name] = float(number) + + return snapshot + + def get_last_rental_contract(self, user_id: int | None) -> dict | None: + context = self.service._get_user_context(user_id) + if not isinstance(context, dict): + return None + contract = context.get("last_rental_contract") + return dict(contract) if isinstance(contract, dict) else None + + def store_last_rental_contract(self, user_id: int | None, payload) -> None: + if user_id is None: + return + context = self.service._get_user_context(user_id) + if not isinstance(context, dict): + return + sanitized = self.sanitize_rental_contract_snapshot(payload) + if sanitized is None: + context.pop("last_rental_contract", None) + else: + context["last_rental_contract"] = sanitized + self.service._save_user_context(user_id=user_id, context=context) + + def remember_rental_results(self, user_id: int | None, rental_results: list[dict] | None) -> None: + context = self.service._get_user_context(user_id) + if not isinstance(context, dict): + return + sanitized = self.sanitize_rental_results(rental_results) + context["last_rental_results"] = sanitized + self.store_pending_rental_selection(user_id=user_id, rental_results=sanitized) + if sanitized: + context["selected_rental_vehicle"] = None + context["active_domain"] = "rental" + self.service._save_user_context(user_id=user_id, context=context) + + def store_selected_rental_vehicle(self, user_id: int | None, vehicle: dict | None) -> None: + if user_id is None: + return + context = self.service._get_user_context(user_id) + if not isinstance(context, dict): + return + context["selected_rental_vehicle"] = dict(vehicle) if isinstance(vehicle, dict) else None + context["active_domain"] = "rental" + self.pop_state_entry("pending_rental_selections", user_id) + self.service._save_user_context(user_id=user_id, context=context) + + def rental_vehicle_to_payload(self, vehicle: dict) -> dict: + return { + "rental_vehicle_id": int(vehicle["id"]), + "placa": str(vehicle["placa"]), + "modelo_veiculo": str(vehicle["modelo"]), + "categoria": str(vehicle.get("categoria") or ""), + "valor_diaria": round(float(vehicle.get("valor_diaria") or 0), 2), + } diff --git a/app/services/flows/review_flow.py b/app/services/flows/review_flow.py index 3caab96..5f876f6 100644 --- a/app/services/flows/review_flow.py +++ b/app/services/flows/review_flow.py @@ -5,31 +5,30 @@ from app.core.time_utils import utc_now from fastapi import HTTPException from app.services.orchestration.orchestrator_config import ( - LAST_REVIEW_PACKAGE_TTL_MINUTES, PENDING_REVIEW_DRAFT_TTL_MINUTES, REVIEW_REQUIRED_FIELDS, ) +from app.services.flows.review_flow_support import ReviewFlowStateSupport # Esse mixin concentra os fluxos incrementais de revisao e pos-venda. class ReviewFlowMixin: + @property + def _review_flow_state_support(self) -> ReviewFlowStateSupport: + support = getattr(self, "__review_flow_state_support", None) + if support is None: + support = ReviewFlowStateSupport(self) + setattr(self, "__review_flow_state_support", support) + return support + def _review_now(self) -> datetime: - provider = getattr(self, "_review_now_provider", None) - if callable(provider): - return provider() - return datetime.now() + return self._review_flow_state_support.review_now() def _get_review_flow_snapshot(self, user_id: int | None, snapshot_key: str) -> dict | None: - if user_id is None or not hasattr(self, "_get_user_context"): - return None - context = self._get_user_context(user_id) - if not isinstance(context, dict): - return None - snapshots = context.get("flow_snapshots") - if not isinstance(snapshots, dict): - return None - snapshot = snapshots.get(snapshot_key) - return dict(snapshot) if isinstance(snapshot, dict) else None + return self._review_flow_state_support.get_flow_snapshot( + user_id=user_id, + snapshot_key=snapshot_key, + ) def _set_review_flow_snapshot( self, @@ -39,51 +38,19 @@ class ReviewFlowMixin: *, active_task: str | None = None, ) -> None: - if user_id is None or not hasattr(self, "_get_user_context") or not hasattr(self, "_save_user_context"): - return - context = self._get_user_context(user_id) - if not isinstance(context, dict): - return - snapshots = context.get("flow_snapshots") - if not isinstance(snapshots, dict): - snapshots = {} - context["flow_snapshots"] = snapshots - - if isinstance(value, dict): - snapshots[snapshot_key] = value - if active_task: - context["active_task"] = active_task - collected_slots = context.get("collected_slots") - if not isinstance(collected_slots, dict): - collected_slots = {} - context["collected_slots"] = collected_slots - payload = value.get("payload") - if isinstance(payload, dict): - collected_slots[active_task] = dict(payload) - else: - snapshots.pop(snapshot_key, None) - if active_task and context.get("active_task") == active_task: - context["active_task"] = None - collected_slots = context.get("collected_slots") - if isinstance(collected_slots, dict) and active_task: - collected_slots.pop(active_task, None) - - self._save_user_context(user_id=user_id, context=context) + self._review_flow_state_support.set_flow_snapshot( + user_id=user_id, + snapshot_key=snapshot_key, + value=value, + active_task=active_task, + ) def _get_review_flow_entry(self, bucket: str, user_id: int | None, snapshot_key: str) -> dict | None: - entry = self.state.get_entry(bucket, user_id, expire=True) - if entry: - return entry - - snapshot = self._get_review_flow_snapshot(user_id=user_id, snapshot_key=snapshot_key) - if not snapshot: - return None - if snapshot.get("expires_at") and snapshot["expires_at"] < utc_now(): - self._set_review_flow_snapshot(user_id=user_id, snapshot_key=snapshot_key, value=None) - return None - - self.state.set_entry(bucket, user_id, snapshot) - return snapshot + return self._review_flow_state_support.get_flow_entry( + bucket=bucket, + user_id=user_id, + snapshot_key=snapshot_key, + ) def _set_review_flow_entry( self, @@ -94,8 +61,8 @@ class ReviewFlowMixin: *, active_task: str | None = None, ) -> None: - self.state.set_entry(bucket, user_id, value) - self._set_review_flow_snapshot( + self._review_flow_state_support.set_flow_entry( + bucket=bucket, user_id=user_id, snapshot_key=snapshot_key, value=value, @@ -110,14 +77,12 @@ class ReviewFlowMixin: *, active_task: str | None = None, ) -> dict | None: - entry = self.state.pop_entry(bucket, user_id) - self._set_review_flow_snapshot( + return self._review_flow_state_support.pop_flow_entry( + bucket=bucket, user_id=user_id, snapshot_key=snapshot_key, - value=None, active_task=active_task, ) - return entry def _decision_intent(self, turn_decision: dict | None) -> str: return str((turn_decision or {}).get("intent") or "").strip().lower() @@ -128,22 +93,14 @@ class ReviewFlowMixin: payload: dict | None = None, missing_fields: list[str] | None = None, ) -> None: - if not hasattr(self, "_log_turn_event"): - return - self._log_turn_event( - "review_flow_progress", - review_flow_source=source, - payload_keys=sorted((payload or {}).keys()), - missing_fields=list(missing_fields or []), + self._review_flow_state_support.log_review_flow_source( + source=source, + payload=payload, + missing_fields=missing_fields, ) def _active_domain(self, user_id: int | None) -> str: - if user_id is None or not hasattr(self, "_get_user_context"): - return "general" - context = self._get_user_context(user_id) - if not isinstance(context, dict): - return "general" - return str(context.get("active_domain") or "general").strip().lower() + return self._review_flow_state_support.active_domain(user_id=user_id) def _clean_review_model_candidate(self, raw_model: str | None) -> str | None: text = str(raw_model or "").strip(" ,.;:-") @@ -659,38 +616,13 @@ class ReviewFlowMixin: ) def _store_last_review_package(self, user_id: int | None, payload: dict | None) -> None: - if user_id is None or not isinstance(payload, dict): - return - # Guarda um pacote reutilizavel do ultimo veiculo informado - # para reduzir repeticao em novos agendamentos. - package = { - "placa": payload.get("placa"), - "modelo": payload.get("modelo"), - "ano": payload.get("ano"), - "km": payload.get("km"), - "revisao_previa_concessionaria": payload.get("revisao_previa_concessionaria"), - } - sanitized = {k: v for k, v in package.items() if v is not None} - required = {"placa", "modelo", "ano", "km", "revisao_previa_concessionaria"} - if not required.issubset(sanitized.keys()): - return - self.state.set_entry( - "last_review_packages", - user_id, - { - "payload": sanitized, - "expires_at": utc_now() + timedelta(minutes=LAST_REVIEW_PACKAGE_TTL_MINUTES), - }, + self._review_flow_state_support.store_last_review_package( + user_id=user_id, + payload=payload, ) def _get_last_review_package(self, user_id: int | None) -> dict | None: - if user_id is None: - return None - cached = self.state.get_entry("last_review_packages", user_id, expire=True) - if not cached: - return None - payload = cached.get("payload") - return dict(payload) if isinstance(payload, dict) else None + return self._review_flow_state_support.get_last_review_package(user_id=user_id) async def _try_collect_and_schedule_review( self, @@ -968,3 +900,4 @@ class ReviewFlowMixin: self._store_last_review_package(user_id=user_id, payload=draft["payload"]) self._log_review_flow_source(source=review_flow_source or "draft", payload=draft["payload"]) return self._fallback_format_tool_result("agendar_revisao", tool_result) + diff --git a/app/services/flows/review_flow_support.py b/app/services/flows/review_flow_support.py new file mode 100644 index 0000000..ed13273 --- /dev/null +++ b/app/services/flows/review_flow_support.py @@ -0,0 +1,72 @@ +from __future__ import annotations + +from datetime import datetime, timedelta + +from app.core.time_utils import utc_now +from app.services.flows.flow_state_support import FlowStateSupport +from app.services.orchestration.orchestrator_config import LAST_REVIEW_PACKAGE_TTL_MINUTES + + +class ReviewFlowStateSupport(FlowStateSupport): + """Concentra estado e utilitarios de suporte do fluxo de revisao.""" + + def review_now(self) -> datetime: + provider = getattr(self.service, "_review_now_provider", None) + if callable(provider): + return provider() + return datetime.now() + + def log_review_flow_source( + self, + source: str, + payload: dict | None = None, + missing_fields: list[str] | None = None, + ) -> None: + if not hasattr(self.service, "_log_turn_event"): + return + self.service._log_turn_event( + "review_flow_progress", + review_flow_source=source, + payload_keys=sorted((payload or {}).keys()), + missing_fields=list(missing_fields or []), + ) + + def active_domain(self, user_id: int | None) -> str: + if user_id is None or not hasattr(self.service, "_get_user_context"): + return "general" + context = self.service._get_user_context(user_id) + if not isinstance(context, dict): + return "general" + return str(context.get("active_domain") or "general").strip().lower() + + def store_last_review_package(self, user_id: int | None, payload: dict | None) -> None: + if user_id is None or not isinstance(payload, dict): + return + package = { + "placa": payload.get("placa"), + "modelo": payload.get("modelo"), + "ano": payload.get("ano"), + "km": payload.get("km"), + "revisao_previa_concessionaria": payload.get("revisao_previa_concessionaria"), + } + sanitized = {key: value for key, value in package.items() if value is not None} + required = {"placa", "modelo", "ano", "km", "revisao_previa_concessionaria"} + if not required.issubset(sanitized.keys()): + return + self.set_state_entry( + "last_review_packages", + user_id, + { + "payload": sanitized, + "expires_at": utc_now() + timedelta(minutes=LAST_REVIEW_PACKAGE_TTL_MINUTES), + }, + ) + + def get_last_review_package(self, user_id: int | None) -> dict | None: + if user_id is None: + return None + cached = self.get_state_entry("last_review_packages", user_id, expire=True) + if not cached: + return None + payload = cached.get("payload") + return dict(payload) if isinstance(payload, dict) else None diff --git a/app/services/orchestration/orchestrator_context_manager.py b/app/services/orchestration/orchestrator_context_manager.py new file mode 100644 index 0000000..1c3c292 --- /dev/null +++ b/app/services/orchestration/orchestrator_context_manager.py @@ -0,0 +1,420 @@ +from __future__ import annotations + +from typing import Any + +from fastapi import HTTPException + +from app.services.orchestration.orchestrator_config import USER_CONTEXT_TTL_MINUTES + + +class OrchestratorContextManager: + """Agrupa a gestao de contexto e efeitos colaterais do turno.""" + + def __init__(self, service) -> None: + self.service = service + + def upsert_user_context(self, user_id: int | None) -> None: + override = self.service.__dict__.get("_upsert_user_context") + if callable(override): + override(user_id) + return + state = getattr(self.service, "state", None) + if state is None: + return + state.upsert_user_context( + user_id=user_id, + ttl_minutes=USER_CONTEXT_TTL_MINUTES, + ) + + def get_user_context(self, user_id: int | None) -> dict | None: + override = self.service.__dict__.get("_get_user_context") + if callable(override): + return override(user_id) + state = getattr(self.service, "state", None) + if state is None: + return None + return state.get_user_context(user_id) + + def save_user_context(self, user_id: int | None, context: dict | None) -> None: + if user_id is None or not isinstance(context, dict): + return + override = self.service.__dict__.get("_save_user_context") + if callable(override): + override(user_id, context) + return + state = getattr(self.service, "state", None) + if state is None: + return + state.save_user_context(user_id=user_id, context=context) + + def extract_generic_memory_fields(self, llm_generic_fields: dict | None = None) -> dict: + extracted: dict[str, Any] = {} + llm_fields = llm_generic_fields or {} + + normalized_plate = self.service._normalize_plate(llm_fields.get("placa")) + if normalized_plate: + extracted["placa"] = normalized_plate + + normalized_cpf = self.service._normalize_cpf(llm_fields.get("cpf")) + if normalized_cpf: + extracted["cpf"] = normalized_cpf + + normalized_budget = self.service._normalize_positive_number(llm_fields.get("orcamento_max")) + if normalized_budget: + extracted["orcamento_max"] = int(round(normalized_budget)) + + normalized_profile = self.service._normalize_vehicle_profile(llm_fields.get("perfil_veiculo")) + if normalized_profile: + extracted["perfil_veiculo"] = normalized_profile + + return extracted + + def capture_generic_memory( + self, + user_id: int | None, + llm_generic_fields: dict | None = None, + ) -> None: + context = self.get_user_context(user_id) + if not context: + return + fields = self.extract_generic_memory_fields(llm_generic_fields=llm_generic_fields) + if fields: + context["generic_memory"].update(fields) + context.setdefault("shared_memory", {}).update(fields) + self.save_user_context(user_id=user_id, context=context) + + def capture_tool_result_context( + self, + tool_name: str, + tool_result, + user_id: int | None, + ) -> None: + context = self.get_user_context(user_id) + if not context: + return + context["last_tool_result"] = { + "tool_name": tool_name, + "result_type": type(tool_result).__name__, + } + if tool_name == "consultar_frota_aluguel" and isinstance(tool_result, list): + sanitized_rental = self.service._sanitize_rental_results(tool_result[:20]) + context["last_rental_results"] = sanitized_rental + self.service._store_pending_rental_selection( + user_id=user_id, + rental_results=sanitized_rental, + ) + if sanitized_rental: + context["selected_rental_vehicle"] = None + context["active_domain"] = "rental" + self.save_user_context(user_id=user_id, context=context) + return + + if tool_name != "consultar_estoque" or not isinstance(tool_result, list): + self.save_user_context(user_id=user_id, context=context) + return + + sanitized = self.service._sanitize_stock_results(tool_result[:5]) + context["last_stock_results"] = sanitized + self.service._store_pending_stock_selection( + user_id=user_id, + stock_results=sanitized, + ) + if sanitized: + context["selected_vehicle"] = None + self.save_user_context(user_id=user_id, context=context) + + def capture_successful_tool_side_effects( + self, + tool_name: str, + arguments: dict | None, + tool_result, + user_id: int | None, + ) -> None: + if tool_name == "agendar_revisao" and isinstance(arguments, dict): + self.service._store_last_review_package(user_id=user_id, payload=arguments) + if tool_name in { + "abrir_locacao_aluguel", + "registrar_devolucao_aluguel", + "registrar_pagamento_aluguel", + "registrar_multa_aluguel", + } and isinstance(tool_result, dict): + self.service._store_last_rental_contract(user_id=user_id, payload=tool_result) + self.capture_tool_result_context( + tool_name=tool_name, + tool_result=tool_result, + user_id=user_id, + ) + + async def maybe_build_stock_suggestion_response( + self, + tool_name: str, + arguments: dict | None, + tool_result, + user_id: int | None, + ) -> str | None: + if tool_name != "consultar_estoque" or not isinstance(tool_result, list) or tool_result: + return None + + budget = self.service._normalize_positive_number((arguments or {}).get("preco_max")) + if not budget: + return None + + relaxed_arguments = dict(arguments or {}) + relaxed_arguments["preco_max"] = max(float(budget) * 1.2, float(budget) + 10000.0) + relaxed_arguments["limite"] = min(max(int((arguments or {}).get("limite") or 5), 1), 5) + relaxed_arguments["ordenar_preco"] = "asc" + + try: + relaxed_result = await self.service.tool_executor.execute( + "consultar_estoque", + relaxed_arguments, + user_id=user_id, + ) + except HTTPException: + return None + + if not isinstance(relaxed_result, list): + return None + + nearby = [] + for item in relaxed_result: + if not isinstance(item, dict): + continue + try: + price = float(item.get("preco") or 0) + except (TypeError, ValueError): + continue + if price > float(budget): + nearby.append(item) + + if not nearby: + return None + + nearby = [{**item, "budget_relaxed": True} for item in nearby] + self.capture_tool_result_context( + tool_name="consultar_estoque", + tool_result=nearby, + user_id=user_id, + ) + + budget_label = f"R$ {float(budget):,.0f}".replace(",", ".") + lines = [f"Nao encontrei veiculos ate {budget_label}."] + lines.append("Mas achei algumas opcoes proximas ao seu orcamento:") + for idx, item in enumerate(nearby[:5], start=1): + modelo = str(item.get("modelo") or "N/A") + categoria = str(item.get("categoria") or "N/A") + codigo = item.get("id", "N/A") + preco = f"R$ {float(item.get('preco') or 0):,.2f}".replace(",", "X").replace(".", ",").replace("X", ".") + lines.append(f"{idx}. [{codigo}] {modelo} ({categoria}) - {preco}") + lines.append("Se quiser, responda com o numero da lista ou com o modelo.") + return "\n".join(lines) + + def new_tab_memory(self, user_id: int | None) -> dict: + context = self.get_user_context(user_id) + if not context: + return {} + shared = context.get("shared_memory", {}) + if not isinstance(shared, dict): + return {} + return dict(shared) + + def reset_pending_rental_states(self, user_id: int | None) -> None: + if user_id is None: + return + self.service.state.pop_entry("pending_rental_drafts", user_id) + self.service.state.pop_entry("pending_rental_selections", user_id) + context = self.get_user_context(user_id) + if isinstance(context, dict): + context["last_rental_results"] = [] + context["selected_rental_vehicle"] = None + if context.get("active_task") == "rental_create": + context["active_task"] = None + if str(context.get("active_domain") or "").strip().lower() == "rental": + context["active_domain"] = "general" + self.save_user_context(user_id=user_id, context=context) + + def reset_pending_review_states(self, user_id: int | None) -> None: + if user_id is None: + return + self.service.state.pop_entry("pending_review_drafts", user_id) + self.service.state.pop_entry("pending_review_confirmations", user_id) + self.service.state.pop_entry("pending_review_management_drafts", user_id) + self.service.state.pop_entry("pending_review_reuse_confirmations", user_id) + context = self.get_user_context(user_id) + if isinstance(context, dict): + snapshots = context.get("flow_snapshots") + if isinstance(snapshots, dict): + snapshots.pop("review_schedule", None) + snapshots.pop("review_confirmation", None) + snapshots.pop("review_management", None) + snapshots.pop("review_reuse_confirmation", None) + collected_slots = context.get("collected_slots") + if isinstance(collected_slots, dict): + collected_slots.pop("review_schedule", None) + collected_slots.pop("review_management", None) + if context.get("active_task") in {"review_schedule", "review_management"}: + context["active_task"] = None + self.save_user_context(user_id=user_id, context=context) + + def reset_pending_order_states(self, user_id: int | None) -> None: + if user_id is None: + return + self.service.state.pop_entry("pending_order_drafts", user_id) + self.service.state.pop_entry("pending_cancel_order_drafts", user_id) + self.service.state.pop_entry("pending_stock_selections", user_id) + context = self.get_user_context(user_id) + if isinstance(context, dict): + snapshots = context.get("flow_snapshots") + if isinstance(snapshots, dict): + snapshots.pop("order_create", None) + snapshots.pop("order_cancel", None) + collected_slots = context.get("collected_slots") + if isinstance(collected_slots, dict): + collected_slots.pop("order_create", None) + collected_slots.pop("order_cancel", None) + if context.get("active_task") in {"order_create", "order_cancel"}: + context["active_task"] = None + self.save_user_context(user_id=user_id, context=context) + + def clear_user_conversation_state(self, user_id: int | None) -> None: + context = self.get_user_context(user_id) + if not context: + return + self.reset_pending_review_states(user_id=user_id) + self.reset_pending_order_states(user_id=user_id) + self.reset_pending_rental_states(user_id=user_id) + self.service.state.pop_entry("last_review_packages", user_id) + context["active_domain"] = "general" + context["active_task"] = None + context["generic_memory"] = {} + context["shared_memory"] = {} + context["collected_slots"] = {} + context["flow_snapshots"] = {} + context["last_tool_result"] = None + context["order_queue"] = [] + context["pending_order_selection"] = None + context["pending_switch"] = None + context["last_stock_results"] = [] + context["selected_vehicle"] = None + context["last_rental_results"] = [] + context["selected_rental_vehicle"] = None + context.pop("last_rental_contract", None) + self.save_user_context(user_id=user_id, context=context) + + def clear_pending_order_navigation(self, user_id: int | None) -> int: + context = self.get_user_context(user_id) + if not context: + return 0 + dropped = len(context.get("order_queue", [])) + if context.get("pending_switch"): + dropped += 1 + if context.get("pending_order_selection"): + pending_orders = context["pending_order_selection"].get("orders") or [] + dropped += len(pending_orders) + context["order_queue"] = [] + context["pending_switch"] = None + context["pending_order_selection"] = None + self.save_user_context(user_id=user_id, context=context) + return dropped + + def cancel_active_flow(self, user_id: int | None) -> str: + context = self.get_user_context(user_id) + if not context: + return "Nao havia contexto ativo para cancelar." + + active_domain = context.get("active_domain", "general") + had_flow = self.service._has_open_flow(user_id=user_id, domain=active_domain) + if active_domain == "review": + self.reset_pending_review_states(user_id=user_id) + elif active_domain == "sales": + self.reset_pending_order_states(user_id=user_id) + elif active_domain == "rental": + self.reset_pending_rental_states(user_id=user_id) + + context["pending_switch"] = None + self.save_user_context(user_id=user_id, context=context) + if had_flow: + return f"Fluxo atual de {self.service._domain_label(active_domain)} cancelado." + return "Nao havia fluxo em andamento para cancelar." + + async def continue_next_order_now(self, user_id: int | None) -> str: + context = self.get_user_context(user_id) + if not context: + return "Nao encontrei contexto ativo para continuar." + + if context.get("pending_order_selection"): + return "Ainda preciso que voce escolha qual das duas acoes deseja iniciar primeiro." + + pending_switch = context.get("pending_switch") + if isinstance(pending_switch, dict): + queued_message = str(pending_switch.get("queued_message") or "").strip() + if queued_message: + target_domain = str(pending_switch.get("target_domain") or "general") + memory_seed = dict(pending_switch.get("memory_seed") or {}) + self.service._apply_domain_switch(user_id=user_id, target_domain=target_domain) + refreshed = self.get_user_context(user_id) + if refreshed is not None: + refreshed["generic_memory"] = memory_seed + self.save_user_context(user_id=user_id, context=refreshed) + transition = self.service._build_next_order_transition(target_domain) + next_response = await self.service.handle_message(queued_message, user_id=user_id) + return f"{transition}\n{next_response}" + + next_order = self.service._pop_next_order(user_id=user_id) + if not next_order: + return "Nao ha pedidos pendentes na fila para continuar." + + target_domain = str(next_order.get("domain") or "general") + memory_seed = dict(next_order.get("memory_seed") or self.new_tab_memory(user_id=user_id)) + self.service._apply_domain_switch(user_id=user_id, target_domain=target_domain) + refreshed = self.get_user_context(user_id) + if refreshed is not None: + refreshed["generic_memory"] = memory_seed + self.save_user_context(user_id=user_id, context=refreshed) + transition = self.service._build_next_order_transition(target_domain) + next_response = await self.service.handle_message( + str(next_order.get("message") or ""), + user_id=user_id, + ) + return f"{transition}\n{next_response}" + + async def tool_limpar_contexto_conversa( + self, + motivo: str | None = None, + user_id: int | None = None, + ) -> dict: + self.clear_user_conversation_state(user_id=user_id) + message = "Contexto da conversa limpo. Podemos recomecar do zero." + if motivo: + message = f"{message}\nMotivo registrado: {motivo.strip()}" + return {"message": message} + + async def tool_descartar_pedidos_pendentes( + self, + motivo: str | None = None, + user_id: int | None = None, + ) -> dict: + dropped = self.clear_pending_order_navigation(user_id=user_id) + if dropped <= 0: + message = "Nao havia pedidos pendentes na fila para descartar." + elif dropped == 1: + message = "Descartei 1 pedido pendente da fila." + else: + message = f"Descartei {dropped} pedidos pendentes da fila." + if motivo: + message = f"{message}\nMotivo registrado: {motivo.strip()}" + return {"message": message} + + async def tool_cancelar_fluxo_atual( + self, + motivo: str | None = None, + user_id: int | None = None, + ) -> dict: + message = self.cancel_active_flow(user_id=user_id) + if motivo: + message = f"{message}\nMotivo registrado: {motivo.strip()}" + return {"message": message} + + async def tool_continuar_proximo_pedido(self, user_id: int | None = None) -> str: + return await self.continue_next_order_now(user_id=user_id) + diff --git a/app/services/orchestration/orchestrator_execution_manager.py b/app/services/orchestration/orchestrator_execution_manager.py new file mode 100644 index 0000000..097b409 --- /dev/null +++ b/app/services/orchestration/orchestrator_execution_manager.py @@ -0,0 +1,296 @@ +from __future__ import annotations + +import json +import logging +from time import perf_counter +from typing import Any + +from fastapi import HTTPException + +from app.core.time_utils import utc_now +from app.services.orchestration.entity_normalizer import EntityNormalizer +from app.services.orchestration.orchestrator_config import ( + DETERMINISTIC_RESPONSE_TOOLS, + LOW_VALUE_RESPONSES, + ORCHESTRATION_CONTROL_TOOLS, +) +from app.services.orchestration.prompt_builders import ( + build_force_tool_prompt, + build_result_prompt, + build_router_prompt, +) +from app.services.orchestration.sensitive_data import mask_sensitive_payload, mask_sensitive_text + +logger = logging.getLogger(__name__) + + +class OrchestratorExecutionManager: + """Centraliza instrumentacao, prompts e execucao tecnica de tools.""" + + def __init__(self, service, logger_instance=None) -> None: + self.service = service + self.logger = logger_instance or logger + + def build_router_prompt(self, user_message: str, user_id: int | None) -> str: + conversation_context = self.service._build_context_summary(user_id=user_id) + return build_router_prompt( + user_message=user_message, + user_id=user_id, + conversation_context=conversation_context, + ) + + def build_force_tool_prompt(self, user_message: str, user_id: int | None) -> str: + conversation_context = self.service._build_context_summary(user_id=user_id) + return build_force_tool_prompt( + user_message=user_message, + user_id=user_id, + conversation_context=conversation_context, + ) + + def build_result_prompt( + self, + user_message: str, + user_id: int | None, + tool_name: str, + tool_result, + ) -> str: + conversation_context = self.service._build_context_summary(user_id=user_id) + return build_result_prompt( + user_message=user_message, + user_id=user_id, + tool_name=tool_name, + tool_result=tool_result, + conversation_context=conversation_context, + ) + + def capture_turn_decision_trace(self, turn_decision: dict | None) -> None: + trace = getattr(self.service, "_turn_trace", None) + if not isinstance(trace, dict) or not isinstance(turn_decision, dict): + return + trace["intent"] = str(turn_decision.get("intent") or "").strip() or None + trace["domain"] = str(turn_decision.get("domain") or "").strip() or None + trace["action"] = str(turn_decision.get("action") or "").strip() or None + + def capture_tool_invocation_trace(self, tool_name: str, arguments: dict | None) -> None: + trace = getattr(self.service, "_turn_trace", None) + if not isinstance(trace, dict): + return + trace["tool_name"] = str(tool_name or "").strip() or None + trace["tool_arguments"] = mask_sensitive_payload(dict(arguments or {})) if isinstance(arguments, dict) else None + + def finalize_turn_history( + self, + *, + user_message: str, + assistant_response: str | None, + turn_status: str, + error_detail: str | None = None, + ) -> None: + history_service = getattr(self.service, "history_service", None) + if history_service is None: + return + + trace = getattr(self.service, "_turn_trace", {}) or {} + history_service.record_turn( + request_id=str(trace.get("request_id") or ""), + conversation_id=str(trace.get("conversation_id") or "anonymous"), + user_id=trace.get("user_id"), + user_message=str(user_message or ""), + assistant_response=assistant_response, + turn_status=str(turn_status or "completed"), + intent=trace.get("intent"), + domain=trace.get("domain"), + action=trace.get("action"), + tool_name=trace.get("tool_name"), + tool_arguments=trace.get("tool_arguments"), + error_detail=error_detail, + started_at=trace.get("started_at"), + completed_at=utc_now(), + elapsed_ms=trace.get("elapsed_ms"), + ) + + def format_turn_error(self, exc: Exception) -> str: + if isinstance(exc, HTTPException): + detail = exc.detail + if isinstance(detail, dict): + return json.dumps(mask_sensitive_payload(detail), ensure_ascii=True, separators=(",", ":"), default=str) + return str(mask_sensitive_text(str(detail))) + return str(mask_sensitive_text(f"{type(exc).__name__}: {exc}")) + + def log_turn_event(self, event: str, **payload) -> None: + trace = getattr(self.service, "_turn_trace", {}) or {} + safe_payload = mask_sensitive_payload( + { + "request_id": trace.get("request_id"), + "conversation_id": trace.get("conversation_id"), + **payload, + } + ) + self.logger.info( + "turn_event=%s payload=%s", + event, + safe_payload, + ) + + async def call_llm_with_trace(self, operation: str, message: str, tools): + started_at = perf_counter() + try: + result = await self.service.llm.generate_response(message=message, tools=tools) + elapsed_ms = round((perf_counter() - started_at) * 1000, 2) + self.log_turn_event( + "llm_completed", + operation=operation, + elapsed_ms=elapsed_ms, + tool_call=bool(result.get("tool_call")), + ) + return result + except Exception: + elapsed_ms = round((perf_counter() - started_at) * 1000, 2) + self.log_turn_event( + "llm_failed", + operation=operation, + elapsed_ms=elapsed_ms, + ) + raise + + def merge_pending_draft_tool_arguments( + self, + tool_name: str, + arguments: dict, + user_id: int | None, + ) -> dict: + if user_id is None or not isinstance(arguments, dict): + return dict(arguments or {}) + if not hasattr(self.service, "state") or self.service.state is None: + return dict(arguments) + + bucket_map = { + "agendar_revisao": "pending_review_drafts", + "realizar_pedido": "pending_order_drafts", + "cancelar_pedido": "pending_cancel_order_drafts", + "cancelar_agendamento_revisao": "pending_review_management_drafts", + "editar_data_revisao": "pending_review_management_drafts", + } + bucket = bucket_map.get(tool_name) + if not bucket: + return dict(arguments) + + draft = self.service.state.get_entry(bucket, user_id, expire=True) + if not isinstance(draft, dict): + return dict(arguments) + payload = draft.get("payload") + if not isinstance(payload, dict): + return dict(arguments) + + merged_arguments = dict(payload) + merged_arguments.update(arguments) + return merged_arguments + + def normalize_tool_invocation( + self, + tool_name: str, + arguments: dict | None, + user_id: int | None, + ) -> tuple[str, dict]: + normalizer = getattr(self.service, "normalizer", None) + if normalizer is None: + normalizer = EntityNormalizer() + self.service.normalizer = normalizer + normalized_tool_name = normalizer.normalize_tool_name(tool_name) or str(tool_name or "").strip() + normalized_arguments = normalizer.normalize_tool_arguments(normalized_tool_name, arguments or {}) + normalized_arguments = self.merge_pending_draft_tool_arguments( + tool_name=normalized_tool_name, + arguments=normalized_arguments, + user_id=user_id, + ) + return normalized_tool_name, normalized_arguments + + async def execute_tool_with_trace(self, tool_name: str, arguments: dict, user_id: int | None): + tool_name, arguments = self.normalize_tool_invocation( + tool_name=tool_name, + arguments=arguments, + user_id=user_id, + ) + self.capture_tool_invocation_trace(tool_name=tool_name, arguments=arguments) + started_at = perf_counter() + try: + result = await self.service.tool_executor.execute(tool_name, arguments, user_id=user_id) + elapsed_ms = round((perf_counter() - started_at) * 1000, 2) + self.log_turn_event( + "tool_completed", + tool_name=tool_name, + elapsed_ms=elapsed_ms, + arguments=arguments, + result=result, + ) + return result + except HTTPException as exc: + elapsed_ms = round((perf_counter() - started_at) * 1000, 2) + self.log_turn_event( + "tool_failed", + tool_name=tool_name, + elapsed_ms=elapsed_ms, + arguments=arguments, + error=self.service.tool_executor.coerce_http_error(exc), + ) + raise + + async def render_tool_response_with_fallback( + self, + user_message: str, + user_id: int | None, + tool_name: str, + tool_result, + ) -> str: + fallback_response = self.fallback_format_tool_result(tool_name, tool_result) + if self.should_use_deterministic_response(tool_name): + self.log_turn_event( + "tool_response_fallback", + tool_name=tool_name, + reason="deterministic_tool", + ) + return fallback_response + + try: + final_response = await self.call_llm_with_trace( + operation="tool_result_response", + message=self.build_result_prompt( + user_message=user_message, + user_id=user_id, + tool_name=tool_name, + tool_result=tool_result, + ), + tools=[], + ) + except Exception: + self.log_turn_event( + "tool_response_fallback", + tool_name=tool_name, + reason="llm_failure", + ) + return fallback_response + + text = (final_response.get("response") or "").strip() + if self.is_low_value_response(text): + self.log_turn_event( + "tool_response_fallback", + tool_name=tool_name, + reason="low_value_response", + ) + return fallback_response + return text or fallback_response + + def should_use_deterministic_response(self, tool_name: str) -> bool: + return tool_name in DETERMINISTIC_RESPONSE_TOOLS or tool_name in ORCHESTRATION_CONTROL_TOOLS + + def is_low_value_response(self, text: str) -> bool: + return text.strip().lower() in LOW_VALUE_RESPONSES + + def http_exception_detail(self, exc: HTTPException) -> str: + return self.service.tool_executor.http_exception_detail(exc) + + def fallback_format_tool_result(self, tool_name: str, tool_result: Any) -> str: + return self.service.tool_executor.fallback_format_tool_result( + tool_name=tool_name, + tool_result=tool_result, + ) diff --git a/app/services/orchestration/orquestrador_service.py b/app/services/orchestration/orquestrador_service.py index c04d0af..883beac 100644 --- a/app/services/orchestration/orquestrador_service.py +++ b/app/services/orchestration/orquestrador_service.py @@ -23,6 +23,8 @@ from app.services.orchestration.message_planner import MessagePlanner from app.services.orchestration.conversation_history_service import ConversationHistoryService from app.services.orchestration.sensitive_data import mask_sensitive_payload, mask_sensitive_text from app.services.orchestration.state_repository_factory import get_conversation_state_repository +from app.services.orchestration.orchestrator_context_manager import OrchestratorContextManager +from app.services.orchestration.orchestrator_execution_manager import OrchestratorExecutionManager from app.services.flows.order_flow import OrderFlowMixin from app.services.flows.rental_flow import RentalFlowMixin from app.services.orchestration.prompt_builders import ( @@ -55,6 +57,22 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin, RentalFlowMixin): self.policy = ConversationPolicy(service=self) self.history_service = ConversationHistoryService() + @property + def _context_manager(self) -> OrchestratorContextManager: + manager = getattr(self, "__context_manager", None) + if manager is None: + manager = OrchestratorContextManager(service=self) + setattr(self, "__context_manager", manager) + return manager + + @property + def _execution_manager(self) -> OrchestratorExecutionManager: + manager = getattr(self, "__execution_manager", None) + if manager is None: + manager = OrchestratorExecutionManager(service=self, logger_instance=logger) + setattr(self, "__execution_manager", manager) + return manager + def _build_orchestration_tool_handlers(self) -> dict: return { "limpar_contexto_conversa": self._tool_limpar_contexto_conversa, @@ -1346,251 +1364,83 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin, RentalFlowMixin): # Limpa drafts e selecoes de locacao quando o fluxo termina ou e abortado. def _reset_pending_rental_states(self, user_id: int | None) -> None: - if user_id is None: - return - self.state.pop_entry("pending_rental_drafts", user_id) - self.state.pop_entry("pending_rental_selections", user_id) - context = self._get_user_context(user_id) - if isinstance(context, dict): - context["last_rental_results"] = [] - context["selected_rental_vehicle"] = None - if context.get("active_task") == "rental_create": - context["active_task"] = None - if str(context.get("active_domain") or "").strip().lower() == "rental": - context["active_domain"] = "general" - self._save_user_context(user_id=user_id, context=context) + self._context_manager.reset_pending_rental_states(user_id=user_id) def _reset_pending_review_states(self, user_id: int | None) -> None: - if user_id is None: - return - self.state.pop_entry("pending_review_drafts", user_id) - self.state.pop_entry("pending_review_confirmations", user_id) - self.state.pop_entry("pending_review_management_drafts", user_id) - self.state.pop_entry("pending_review_reuse_confirmations", user_id) - context = self._get_user_context(user_id) - if isinstance(context, dict): - snapshots = context.get("flow_snapshots") - if isinstance(snapshots, dict): - snapshots.pop("review_schedule", None) - snapshots.pop("review_confirmation", None) - snapshots.pop("review_management", None) - snapshots.pop("review_reuse_confirmation", None) - collected_slots = context.get("collected_slots") - if isinstance(collected_slots, dict): - collected_slots.pop("review_schedule", None) - collected_slots.pop("review_management", None) - if context.get("active_task") in {"review_schedule", "review_management"}: - context["active_task"] = None - self._save_user_context(user_id=user_id, context=context) + self._context_manager.reset_pending_review_states(user_id=user_id) def _reset_pending_order_states(self, user_id: int | None) -> None: - if user_id is None: - return - self.state.pop_entry("pending_order_drafts", user_id) - self.state.pop_entry("pending_cancel_order_drafts", user_id) - self.state.pop_entry("pending_stock_selections", user_id) - context = self._get_user_context(user_id) - if isinstance(context, dict): - snapshots = context.get("flow_snapshots") - if isinstance(snapshots, dict): - snapshots.pop("order_create", None) - snapshots.pop("order_cancel", None) - collected_slots = context.get("collected_slots") - if isinstance(collected_slots, dict): - collected_slots.pop("order_create", None) - collected_slots.pop("order_cancel", None) - if context.get("active_task") in {"order_create", "order_cancel"}: - context["active_task"] = None - self._save_user_context(user_id=user_id, context=context) + self._context_manager.reset_pending_order_states(user_id=user_id) def _clear_user_conversation_state(self, user_id: int | None) -> None: - context = self._get_user_context(user_id) - if not context: - return - self._reset_pending_review_states(user_id=user_id) - self._reset_pending_order_states(user_id=user_id) - self._reset_pending_rental_states(user_id=user_id) - self.state.pop_entry("last_review_packages", user_id) - context["active_domain"] = "general" - context["active_task"] = None - context["generic_memory"] = {} - context["shared_memory"] = {} - context["collected_slots"] = {} - context["flow_snapshots"] = {} - context["last_tool_result"] = None - context["order_queue"] = [] - context["pending_order_selection"] = None - context["pending_switch"] = None - context["last_stock_results"] = [] - context["selected_vehicle"] = None - context["last_rental_results"] = [] - context["selected_rental_vehicle"] = None - context.pop("last_rental_contract", None) - self._save_user_context(user_id=user_id, context=context) + self._context_manager.clear_user_conversation_state(user_id=user_id) def _clear_pending_order_navigation(self, user_id: int | None) -> int: - context = self._get_user_context(user_id) - if not context: - return 0 - dropped = len(context.get("order_queue", [])) - if context.get("pending_switch"): - dropped += 1 - if context.get("pending_order_selection"): - pending_orders = context["pending_order_selection"].get("orders") or [] - dropped += len(pending_orders) - context["order_queue"] = [] - context["pending_switch"] = None - context["pending_order_selection"] = None - self._save_user_context(user_id=user_id, context=context) - return dropped + return self._context_manager.clear_pending_order_navigation(user_id=user_id) def _cancel_active_flow(self, user_id: int | None) -> str: - context = self._get_user_context(user_id) - if not context: - return "Nao havia contexto ativo para cancelar." - - active_domain = context.get("active_domain", "general") - had_flow = self._has_open_flow(user_id=user_id, domain=active_domain) - if active_domain == "review": - self._reset_pending_review_states(user_id=user_id) - elif active_domain == "sales": - self._reset_pending_order_states(user_id=user_id) - elif active_domain == "rental": - self._reset_pending_rental_states(user_id=user_id) - - context["pending_switch"] = None - self._save_user_context(user_id=user_id, context=context) - if had_flow: - return f"Fluxo atual de {self._domain_label(active_domain)} cancelado." - return "Nao havia fluxo em andamento para cancelar." + return self._context_manager.cancel_active_flow(user_id=user_id) async def _continue_next_order_now(self, user_id: int | None) -> str: - context = self._get_user_context(user_id) - if not context: - return "Nao encontrei contexto ativo para continuar." - - if context.get("pending_order_selection"): - return "Ainda preciso que voce escolha qual das duas acoes deseja iniciar primeiro." - - pending_switch = context.get("pending_switch") - if isinstance(pending_switch, dict): - queued_message = str(pending_switch.get("queued_message") or "").strip() - if queued_message: - target_domain = str(pending_switch.get("target_domain") or "general") - memory_seed = dict(pending_switch.get("memory_seed") or {}) - self._apply_domain_switch(user_id=user_id, target_domain=target_domain) - refreshed = self._get_user_context(user_id) - if refreshed is not None: - refreshed["generic_memory"] = memory_seed - self._save_user_context(user_id=user_id, context=refreshed) - transition = self._build_next_order_transition(target_domain) - next_response = await self.handle_message(queued_message, user_id=user_id) - return f"{transition}\n{next_response}" - - next_order = self._pop_next_order(user_id=user_id) - if not next_order: - return "Nao ha pedidos pendentes na fila para continuar." - - target_domain = str(next_order.get("domain") or "general") - memory_seed = dict(next_order.get("memory_seed") or self._new_tab_memory(user_id=user_id)) - self._apply_domain_switch(user_id=user_id, target_domain=target_domain) - refreshed = self._get_user_context(user_id) - if refreshed is not None: - refreshed["generic_memory"] = memory_seed - self._save_user_context(user_id=user_id, context=refreshed) - transition = self._build_next_order_transition(target_domain) - next_response = await self.handle_message(str(next_order.get("message") or ""), user_id=user_id) - return f"{transition}\n{next_response}" + return await self._context_manager.continue_next_order_now(user_id=user_id) async def _tool_limpar_contexto_conversa( self, motivo: str | None = None, user_id: int | None = None, ) -> dict: - self._clear_user_conversation_state(user_id=user_id) - message = "Contexto da conversa limpo. Podemos recomecar do zero." - if motivo: - message = f"{message}\nMotivo registrado: {motivo.strip()}" - return {"message": message} + return await self._context_manager.tool_limpar_contexto_conversa( + motivo=motivo, + user_id=user_id, + ) async def _tool_descartar_pedidos_pendentes( self, motivo: str | None = None, user_id: int | None = None, ) -> dict: - dropped = self._clear_pending_order_navigation(user_id=user_id) - if dropped <= 0: - message = "Nao havia pedidos pendentes na fila para descartar." - elif dropped == 1: - message = "Descartei 1 pedido pendente da fila." - else: - message = f"Descartei {dropped} pedidos pendentes da fila." - if motivo: - message = f"{message}\nMotivo registrado: {motivo.strip()}" - return {"message": message} + return await self._context_manager.tool_descartar_pedidos_pendentes( + motivo=motivo, + user_id=user_id, + ) async def _tool_cancelar_fluxo_atual( self, motivo: str | None = None, user_id: int | None = None, ) -> dict: - message = self._cancel_active_flow(user_id=user_id) - if motivo: - message = f"{message}\nMotivo registrado: {motivo.strip()}" - return {"message": message} + return await self._context_manager.tool_cancelar_fluxo_atual( + motivo=motivo, + user_id=user_id, + ) async def _tool_continuar_proximo_pedido(self, user_id: int | None = None) -> str: - return await self._continue_next_order_now(user_id=user_id) + return await self._context_manager.tool_continuar_proximo_pedido(user_id=user_id) # Nessa funcao eu configuro a memoria volatil do sistema def _upsert_user_context(self, user_id: int | None) -> None: - self.state.upsert_user_context(user_id=user_id, ttl_minutes=USER_CONTEXT_TTL_MINUTES) + self._context_manager.upsert_user_context(user_id=user_id) def _get_user_context(self, user_id: int | None) -> dict | None: - return self.state.get_user_context(user_id) + return self._context_manager.get_user_context(user_id) def _save_user_context(self, user_id: int | None, context: dict | None) -> None: - if user_id is None or not isinstance(context, dict): - return - self.state.save_user_context(user_id=user_id, context=context) + self._context_manager.save_user_context(user_id=user_id, context=context) def _extract_generic_memory_fields(self, llm_generic_fields: dict | None = None) -> dict: - extracted: dict = {} - llm_fields = llm_generic_fields or {} - - normalized_plate = self._normalize_plate(llm_fields.get("placa")) - if normalized_plate: - extracted["placa"] = normalized_plate - - normalized_cpf = self._normalize_cpf(llm_fields.get("cpf")) - if normalized_cpf: - extracted["cpf"] = normalized_cpf - - normalized_budget = self._normalize_positive_number(llm_fields.get("orcamento_max")) - if normalized_budget: - extracted["orcamento_max"] = int(round(normalized_budget)) - - normalized_profile = self._normalize_vehicle_profile(llm_fields.get("perfil_veiculo")) - if normalized_profile: - extracted["perfil_veiculo"] = normalized_profile - - return extracted + return self._context_manager.extract_generic_memory_fields( + llm_generic_fields=llm_generic_fields, + ) def _capture_generic_memory( self, user_id: int | None, llm_generic_fields: dict | None = None, ) -> None: - context = self._get_user_context(user_id) - if not context: - return - fields = self._extract_generic_memory_fields(llm_generic_fields=llm_generic_fields) - if fields: - # "Memoria generica" e um dict acumulado por usuario. - # Campos novos entram e campos repetidos sobrescrevem valor antigo. - context["generic_memory"].update(fields) - context.setdefault("shared_memory", {}).update(fields) - self._save_user_context(user_id=user_id, context=context) + self._context_manager.capture_generic_memory( + user_id=user_id, + llm_generic_fields=llm_generic_fields, + ) def _capture_tool_result_context( self, @@ -1598,51 +1448,11 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin, RentalFlowMixin): tool_result, user_id: int | None, ) -> None: - context = self._get_user_context(user_id) - if not context: - return - context["last_tool_result"] = { - "tool_name": tool_name, - "result_type": type(tool_result).__name__, - } - if tool_name == "consultar_frota_aluguel" and isinstance(tool_result, list): - sanitized_rental = self._sanitize_rental_results(tool_result[:20]) - context["last_rental_results"] = sanitized_rental - self._store_pending_rental_selection(user_id=user_id, rental_results=sanitized_rental) - if sanitized_rental: - context["selected_rental_vehicle"] = None - context["active_domain"] = "rental" - self._save_user_context(user_id=user_id, context=context) - return - - if tool_name != "consultar_estoque" or not isinstance(tool_result, list): - self._save_user_context(user_id=user_id, context=context) - return - - sanitized: list[dict] = [] - for item in tool_result[:5]: - if not isinstance(item, dict): - continue - try: - vehicle_id = int(item.get("id")) - preco = float(item.get("preco") or 0) - except (TypeError, ValueError): - continue - sanitized.append( - { - "id": vehicle_id, - "modelo": str(item.get("modelo") or "").strip(), - "categoria": str(item.get("categoria") or "").strip(), - "preco": preco, - "budget_relaxed": bool(item.get("budget_relaxed", False)), - } - ) - - context["last_stock_results"] = sanitized - self._store_pending_stock_selection(user_id=user_id, stock_results=sanitized) - if sanitized: - context["selected_vehicle"] = None - self._save_user_context(user_id=user_id, context=context) + self._context_manager.capture_tool_result_context( + tool_name=tool_name, + tool_result=tool_result, + user_id=user_id, + ) def _capture_successful_tool_side_effects( self, @@ -1651,17 +1461,9 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin, RentalFlowMixin): tool_result, user_id: int | None, ) -> None: - if tool_name == "agendar_revisao" and isinstance(arguments, dict): - self._store_last_review_package(user_id=user_id, payload=arguments) - if tool_name in { - "abrir_locacao_aluguel", - "registrar_devolucao_aluguel", - "registrar_pagamento_aluguel", - "registrar_multa_aluguel", - } and isinstance(tool_result, dict): - self._store_last_rental_contract(user_id=user_id, payload=tool_result) - self._capture_tool_result_context( + self._context_manager.capture_successful_tool_side_effects( tool_name=tool_name, + arguments=arguments, tool_result=tool_result, user_id=user_id, ) @@ -1673,72 +1475,15 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin, RentalFlowMixin): tool_result, user_id: int | None, ) -> str | None: - if tool_name != "consultar_estoque" or not isinstance(tool_result, list) or tool_result: - return None - - budget = self._normalize_positive_number((arguments or {}).get("preco_max")) - if not budget: - return None - - relaxed_arguments = dict(arguments or {}) - relaxed_arguments["preco_max"] = max(float(budget) * 1.2, float(budget) + 10000.0) - relaxed_arguments["limite"] = min(max(int((arguments or {}).get("limite") or 5), 1), 5) - relaxed_arguments["ordenar_preco"] = "asc" - - try: - relaxed_result = await self.tool_executor.execute( - "consultar_estoque", - relaxed_arguments, - user_id=user_id, - ) - except HTTPException: - return None - - if not isinstance(relaxed_result, list): - return None - - nearby = [] - for item in relaxed_result: - if not isinstance(item, dict): - continue - try: - price = float(item.get("preco") or 0) - except (TypeError, ValueError): - continue - if price > float(budget): - nearby.append(item) - - if not nearby: - return None - - nearby = [{**item, "budget_relaxed": True} for item in nearby] - - self._capture_tool_result_context( - tool_name="consultar_estoque", - tool_result=nearby, + return await self._context_manager.maybe_build_stock_suggestion_response( + tool_name=tool_name, + arguments=arguments, + tool_result=tool_result, user_id=user_id, ) - budget_label = f"R$ {float(budget):,.0f}".replace(",", ".") - lines = [f"Nao encontrei veiculos ate {budget_label}."] - lines.append("Mas achei algumas opcoes proximas ao seu orcamento:") - for idx, item in enumerate(nearby[:5], start=1): - modelo = str(item.get("modelo") or "N/A") - categoria = str(item.get("categoria") or "N/A") - codigo = item.get("id", "N/A") - preco = f"R$ {float(item.get('preco') or 0):,.2f}".replace(",", "X").replace(".", ",").replace("X", ".") - lines.append(f"{idx}. [{codigo}] {modelo} ({categoria}) - {preco}") - lines.append("Se quiser, responda com o numero da lista ou com o modelo.") - return "\n".join(lines) - def _new_tab_memory(self, user_id: int | None) -> dict: - context = self._get_user_context(user_id) - if not context: - return {} - shared = context.get("shared_memory", {}) - if not isinstance(shared, dict): - return {} - return dict(shared) + return self._context_manager.new_tab_memory(user_id=user_id) def _empty_extraction_payload(self) -> dict: return self.normalizer.empty_extraction_payload() @@ -2539,19 +2284,15 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin, RentalFlowMixin): return self._fallback_format_tool_result("agendar_revisao", tool_result) def _build_router_prompt(self, user_message: str, user_id: int | None) -> str: - conversation_context = self._build_context_summary(user_id=user_id) - return build_router_prompt( + return self._execution_manager.build_router_prompt( user_message=user_message, user_id=user_id, - conversation_context=conversation_context, ) def _build_force_tool_prompt(self, user_message: str, user_id: int | None) -> str: - conversation_context = self._build_context_summary(user_id=user_id) - return build_force_tool_prompt( + return self._execution_manager.build_force_tool_prompt( user_message=user_message, user_id=user_id, - conversation_context=conversation_context, ) def _build_result_prompt( @@ -2561,29 +2302,21 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin, RentalFlowMixin): tool_name: str, tool_result, ) -> str: - conversation_context = self._build_context_summary(user_id=user_id) - return build_result_prompt( + return self._execution_manager.build_result_prompt( user_message=user_message, user_id=user_id, tool_name=tool_name, tool_result=tool_result, - conversation_context=conversation_context, ) def _capture_turn_decision_trace(self, turn_decision: dict | None) -> None: - trace = getattr(self, "_turn_trace", None) - if not isinstance(trace, dict) or not isinstance(turn_decision, dict): - return - trace["intent"] = str(turn_decision.get("intent") or "").strip() or None - trace["domain"] = str(turn_decision.get("domain") or "").strip() or None - trace["action"] = str(turn_decision.get("action") or "").strip() or None + self._execution_manager.capture_turn_decision_trace(turn_decision=turn_decision) def _capture_tool_invocation_trace(self, tool_name: str, arguments: dict | None) -> None: - trace = getattr(self, "_turn_trace", None) - if not isinstance(trace, dict): - return - trace["tool_name"] = str(tool_name or "").strip() or None - trace["tool_arguments"] = mask_sensitive_payload(dict(arguments or {})) if isinstance(arguments, dict) else None + self._execution_manager.capture_tool_invocation_trace( + tool_name=tool_name, + arguments=arguments, + ) def _finalize_turn_history( self, @@ -2593,72 +2326,25 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin, RentalFlowMixin): turn_status: str, error_detail: str | None = None, ) -> None: - history_service = getattr(self, "history_service", None) - if history_service is None: - return - - trace = getattr(self, "_turn_trace", {}) or {} - history_service.record_turn( - request_id=str(trace.get("request_id") or ""), - conversation_id=str(trace.get("conversation_id") or "anonymous"), - user_id=trace.get("user_id"), - user_message=str(user_message or ""), + self._execution_manager.finalize_turn_history( + user_message=user_message, assistant_response=assistant_response, - turn_status=str(turn_status or "completed"), - intent=trace.get("intent"), - domain=trace.get("domain"), - action=trace.get("action"), - tool_name=trace.get("tool_name"), - tool_arguments=trace.get("tool_arguments"), + turn_status=turn_status, error_detail=error_detail, - started_at=trace.get("started_at"), - completed_at=utc_now(), - elapsed_ms=trace.get("elapsed_ms"), ) def _format_turn_error(self, exc: Exception) -> str: - if isinstance(exc, HTTPException): - detail = exc.detail - if isinstance(detail, dict): - return json.dumps(mask_sensitive_payload(detail), ensure_ascii=True, separators=(",", ":"), default=str) - return str(mask_sensitive_text(str(detail))) - return str(mask_sensitive_text(f"{type(exc).__name__}: {exc}")) + return self._execution_manager.format_turn_error(exc) def _log_turn_event(self, event: str, **payload) -> None: - trace = getattr(self, "_turn_trace", {}) or {} - safe_payload = mask_sensitive_payload( - { - "request_id": trace.get("request_id"), - "conversation_id": trace.get("conversation_id"), - **payload, - } - ) - logger.info( - "turn_event=%s payload=%s", - event, - safe_payload, - ) + self._execution_manager.log_turn_event(event, **payload) async def _call_llm_with_trace(self, operation: str, message: str, tools): - started_at = perf_counter() - try: - result = await self.llm.generate_response(message=message, tools=tools) - elapsed_ms = round((perf_counter() - started_at) * 1000, 2) - self._log_turn_event( - "llm_completed", - operation=operation, - elapsed_ms=elapsed_ms, - tool_call=bool(result.get("tool_call")), - ) - return result - except Exception: - elapsed_ms = round((perf_counter() - started_at) * 1000, 2) - self._log_turn_event( - "llm_failed", - operation=operation, - elapsed_ms=elapsed_ms, - ) - raise + return await self._execution_manager.call_llm_with_trace( + operation=operation, + message=message, + tools=tools, + ) def _merge_pending_draft_tool_arguments( self, @@ -2666,32 +2352,11 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin, RentalFlowMixin): arguments: dict, user_id: int | None, ) -> dict: - if user_id is None or not isinstance(arguments, dict): - return dict(arguments or {}) - if not hasattr(self, "state") or self.state is None: - return dict(arguments) - - bucket_map = { - "agendar_revisao": "pending_review_drafts", - "realizar_pedido": "pending_order_drafts", - "cancelar_pedido": "pending_cancel_order_drafts", - "cancelar_agendamento_revisao": "pending_review_management_drafts", - "editar_data_revisao": "pending_review_management_drafts", - } - bucket = bucket_map.get(tool_name) - if not bucket: - return dict(arguments) - - draft = self.state.get_entry(bucket, user_id, expire=True) - if not isinstance(draft, dict): - return dict(arguments) - payload = draft.get("payload") - if not isinstance(payload, dict): - return dict(arguments) - - merged_arguments = dict(payload) - merged_arguments.update(arguments) - return merged_arguments + return self._execution_manager.merge_pending_draft_tool_arguments( + tool_name=tool_name, + arguments=arguments, + user_id=user_id, + ) def _normalize_tool_invocation( self, @@ -2699,48 +2364,18 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin, RentalFlowMixin): arguments: dict | None, user_id: int | None, ) -> tuple[str, dict]: - normalizer = getattr(self, "normalizer", None) - if normalizer is None: - normalizer = EntityNormalizer() - self.normalizer = normalizer - normalized_tool_name = normalizer.normalize_tool_name(tool_name) or str(tool_name or "").strip() - normalized_arguments = normalizer.normalize_tool_arguments(normalized_tool_name, arguments or {}) - normalized_arguments = self._merge_pending_draft_tool_arguments( - tool_name=normalized_tool_name, - arguments=normalized_arguments, + return self._execution_manager.normalize_tool_invocation( + tool_name=tool_name, + arguments=arguments, user_id=user_id, ) - return normalized_tool_name, normalized_arguments async def _execute_tool_with_trace(self, tool_name: str, arguments: dict, user_id: int | None): - tool_name, arguments = self._normalize_tool_invocation( + return await self._execution_manager.execute_tool_with_trace( tool_name=tool_name, arguments=arguments, user_id=user_id, ) - self._capture_tool_invocation_trace(tool_name=tool_name, arguments=arguments) - started_at = perf_counter() - try: - result = await self.tool_executor.execute(tool_name, arguments, user_id=user_id) - elapsed_ms = round((perf_counter() - started_at) * 1000, 2) - self._log_turn_event( - "tool_completed", - tool_name=tool_name, - elapsed_ms=elapsed_ms, - arguments=arguments, - result=result, - ) - return result - except HTTPException as exc: - elapsed_ms = round((perf_counter() - started_at) * 1000, 2) - self._log_turn_event( - "tool_failed", - tool_name=tool_name, - elapsed_ms=elapsed_ms, - arguments=arguments, - error=self.tool_executor.coerce_http_error(exc), - ) - raise async def _render_tool_response_with_fallback( self, @@ -2749,48 +2384,137 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin, RentalFlowMixin): tool_name: str, tool_result, ) -> str: - fallback_response = self._fallback_format_tool_result(tool_name, tool_result) - if self._should_use_deterministic_response(tool_name): - self._log_turn_event( - "tool_response_fallback", - tool_name=tool_name, - reason="deterministic_tool", - ) - return fallback_response + return await self._execution_manager.render_tool_response_with_fallback( + user_message=user_message, + user_id=user_id, + tool_name=tool_name, + tool_result=tool_result, + ) + + def _should_use_deterministic_response(self, tool_name: str) -> bool: + return self._execution_manager.should_use_deterministic_response(tool_name) + + def _normalize_text(self, text: str) -> str: + return self.normalizer.normalize_text(text) + + def _is_low_value_response(self, text: str) -> bool: + return self._execution_manager.is_low_value_response(text) + + def _is_affirmative_message(self, text: str) -> bool: + normalized = self._normalize_text(text).strip().rstrip(".!?,;:") + return normalized in {"sim", "pode", "ok", "confirmo", "aceito", "fechado", "pode sim", "tenho", "tenho sim"} + def _is_negative_message(self, text: str) -> bool: + normalized = self._normalize_text(text).strip().rstrip(".!?,;:") + return ( + normalized in {"nao", "nao quero", "prefiro outro", "outro horario"} + or normalized.startswith("nao") + ) + + def _extract_time_only(self, text: str) -> str | None: + return self.normalizer.extract_hhmm_from_text(text) + + def _merge_date_with_time(self, base_iso: str, new_time_hhmm: str) -> str | None: try: - final_response = await self._call_llm_with_trace( - operation="tool_result_response", - message=self._build_result_prompt( - user_message=user_message, - user_id=user_id, - tool_name=tool_name, - tool_result=tool_result, - ), - tools=[], - ) + base_dt = datetime.fromisoformat((base_iso or "").replace("Z", "+00:00")) + except ValueError: + return None + try: + hour_text, minute_text = new_time_hhmm.split(":") + merged = base_dt.replace(hour=int(hour_text), minute=int(minute_text), second=0, microsecond=0) + return merged.isoformat() except Exception: - self._log_turn_event( - "tool_response_fallback", - tool_name=tool_name, - reason="llm_failure", - ) - return fallback_response + return None - text = (final_response.get("response") or "").strip() - if self._is_low_value_response(text): - self._log_turn_event( - "tool_response_fallback", - tool_name=tool_name, - reason="low_value_response", + def _capture_review_confirmation_suggestion( + self, + tool_name: str, + arguments: dict, + exc: HTTPException, + user_id: int | None, + ) -> None: + if tool_name != "agendar_revisao" or user_id is None or exc.status_code != 409: + return + detail = exc.detail if isinstance(exc.detail, dict) else {} + suggested_iso = str(detail.get("suggested_iso") or "").strip() + if not suggested_iso: + return + payload = dict(arguments or {}) + if not payload.get("placa"): + return + payload["data_hora"] = suggested_iso + self.state.set_entry("pending_review_confirmations", user_id, { + "payload": payload, + "expires_at": utc_now() + timedelta(minutes=PENDING_REVIEW_TTL_MINUTES), + }) + + async def _try_confirm_pending_review( + self, + message: str, + user_id: int | None, + extracted_review_fields: dict | None = None, + ) -> str | None: + if user_id is None: + return None + pending = self.state.get_entry("pending_review_confirmations", user_id, expire=True) + if not pending: + return None + + time_only = self._extract_time_only(message) + if self._is_negative_message(message) or time_only: + extracted = self._normalize_review_fields(extracted_review_fields) + new_data_hora = extracted.get("data_hora") + if not new_data_hora and time_only: + new_data_hora = self._merge_date_with_time(pending["payload"].get("data_hora", ""), time_only) + if not new_data_hora: + self.state.pop_entry("pending_review_confirmations", user_id) + return "Sem problema. Me informe a nova data e hora desejada para a revisao." + + payload = dict(pending["payload"]) + payload["data_hora"] = new_data_hora + try: + tool_result = await self.tool_executor.execute( + "agendar_revisao", + payload, + user_id=user_id, + ) + except HTTPException as exc: + self.state.pop_entry("pending_review_confirmations", user_id) + self._capture_review_confirmation_suggestion( + tool_name="agendar_revisao", + arguments=payload, + exc=exc, + user_id=user_id, + ) + return self._http_exception_detail(exc) + + self._reset_pending_review_states(user_id=user_id) + self._store_last_review_package(user_id=user_id, payload=payload) + return self._fallback_format_tool_result("agendar_revisao", tool_result) + + if not self._is_affirmative_message(message): + return None + try: + tool_result = await self.tool_executor.execute( + "agendar_revisao", + pending["payload"], + user_id=user_id, ) - return fallback_response - return text or fallback_response + except HTTPException as exc: + self.state.pop_entry("pending_review_confirmations", user_id) + return self._http_exception_detail(exc) + + self._reset_pending_review_states(user_id=user_id) + self._store_last_review_package(user_id=user_id, payload=pending.get("payload")) + return self._fallback_format_tool_result("agendar_revisao", tool_result) def _http_exception_detail(self, exc: HTTPException) -> str: - return self.tool_executor.http_exception_detail(exc) + return self._execution_manager.http_exception_detail(exc) def _fallback_format_tool_result(self, tool_name: str, tool_result) -> str: - return self.tool_executor.fallback_format_tool_result(tool_name=tool_name, tool_result=tool_result) + return self._execution_manager.fallback_format_tool_result( + tool_name=tool_name, + tool_result=tool_result, + )