feat(context): enriquecer resumo conversacional e endurecer expiracao do estado

- adiciona helper central de tempo UTC e passa a reutiliza-lo nas rotinas de expiracao, persistencia temporaria e geracao de identificadores operacionais\n- amplia o build_context_summary com fluxo ativo, memoria generica formatada, ultima tool executada, troca de contexto pendente, fila, selecao de estoque e rascunhos de revisao e pedido\n- reaproveita snapshots de fluxo quando uma chave temporaria do bucket nao estiver mais disponivel, mantendo mais contexto util para o modelo\n- padroniza a expiracao do estado em memoria, no Redis e nos fluxos de pedido para reduzir inconsistencias entre turnos e reinicios\n- adiciona testes dedicados para garantir a qualidade do resumo enviado ao modelo em cenarios de revisao, compra e fallback por snapshot
main
parent 0e019824e6
commit cdb36ab964

@ -0,0 +1,5 @@
from datetime import UTC, datetime
def utc_now() -> datetime:
return datetime.now(UTC).replace(tzinfo=None)

@ -1,4 +1,5 @@
from datetime import datetime
from app.core.time_utils import utc_now
from typing import Any
from uuid import uuid4
@ -138,7 +139,7 @@ async def cancelar_pedido(
pedido.status = "Cancelado"
pedido.motivo_cancelamento = motivo
pedido.data_cancelamento = datetime.utcnow()
pedido.data_cancelamento = utc_now()
db.commit()
db.refresh(pedido)
@ -209,7 +210,7 @@ async def realizar_pedido(
field="cpf",
)
numero_pedido = f"PED-{datetime.utcnow().strftime('%Y%m%d%H%M%S')}-{uuid4().hex[:6].upper()}"
numero_pedido = f"PED-{utc_now().strftime('%Y%m%d%H%M%S')}-{uuid4().hex[:6].upper()}"
if user_id is not None:
user = db.query(User).filter(User.id == user_id).first()
if user and user.cpf != cpf_norm:

@ -1,5 +1,6 @@
import re
from datetime import datetime, timedelta
from app.core.time_utils import utc_now
from fastapi import HTTPException
@ -99,7 +100,7 @@ class OrderFlowMixin:
snapshot = self._get_order_flow_snapshot(user_id=user_id, snapshot_key=snapshot_key)
if not snapshot:
return None
if snapshot.get("expires_at") and snapshot["expires_at"] < datetime.utcnow():
if snapshot.get("expires_at") and snapshot["expires_at"] < utc_now():
self._set_order_flow_snapshot(user_id=user_id, snapshot_key=snapshot_key, value=None)
return None
@ -287,7 +288,7 @@ class OrderFlowMixin:
user_id,
{
"payload": sanitized,
"expires_at": datetime.utcnow() + timedelta(minutes=PENDING_ORDER_SELECTION_TTL_MINUTES),
"expires_at": utc_now() + timedelta(minutes=PENDING_ORDER_SELECTION_TTL_MINUTES),
},
)
@ -768,7 +769,7 @@ class OrderFlowMixin:
if draft is None:
draft = {
"payload": {},
"expires_at": datetime.utcnow() + timedelta(minutes=PENDING_ORDER_DRAFT_TTL_MINUTES),
"expires_at": utc_now() + timedelta(minutes=PENDING_ORDER_DRAFT_TTL_MINUTES),
}
draft["payload"].update(extracted)
@ -791,7 +792,7 @@ class OrderFlowMixin:
pending_single_vehicle = None
elif self._is_negative_message(message):
self._clear_pending_single_vehicle_confirmation(user_id=user_id)
draft["expires_at"] = datetime.utcnow() + timedelta(minutes=PENDING_ORDER_DRAFT_TTL_MINUTES)
draft["expires_at"] = utc_now() + timedelta(minutes=PENDING_ORDER_DRAFT_TTL_MINUTES)
self._set_order_flow_entry(
"pending_order_drafts",
user_id,
@ -801,7 +802,7 @@ class OrderFlowMixin:
)
return "Sem problema. Me diga outro modelo ou ajuste o valor para eu buscar novas opcoes."
elif not self._has_explicit_order_request(message):
draft["expires_at"] = datetime.utcnow() + timedelta(minutes=PENDING_ORDER_DRAFT_TTL_MINUTES)
draft["expires_at"] = utc_now() + timedelta(minutes=PENDING_ORDER_DRAFT_TTL_MINUTES)
self._set_order_flow_entry(
"pending_order_drafts",
user_id,
@ -850,7 +851,7 @@ class OrderFlowMixin:
)
return "Para seguir com o pedido, preciso de um CPF valido. Pode me informar novamente?"
draft["expires_at"] = datetime.utcnow() + timedelta(minutes=PENDING_ORDER_DRAFT_TTL_MINUTES)
draft["expires_at"] = utc_now() + timedelta(minutes=PENDING_ORDER_DRAFT_TTL_MINUTES)
self._set_order_flow_entry(
"pending_order_drafts",
user_id,
@ -889,7 +890,7 @@ class OrderFlowMixin:
error = self.tool_executor.coerce_http_error(exc)
if error.get("retryable") and error.get("field"):
draft["payload"].pop(str(error["field"]), None)
draft["expires_at"] = datetime.utcnow() + timedelta(minutes=PENDING_ORDER_DRAFT_TTL_MINUTES)
draft["expires_at"] = utc_now() + timedelta(minutes=PENDING_ORDER_DRAFT_TTL_MINUTES)
self._set_order_flow_entry(
"pending_order_drafts",
user_id,
@ -972,7 +973,7 @@ class OrderFlowMixin:
if draft is None:
draft = {
"payload": {},
"expires_at": datetime.utcnow() + timedelta(minutes=PENDING_CANCEL_ORDER_DRAFT_TTL_MINUTES),
"expires_at": utc_now() + timedelta(minutes=PENDING_CANCEL_ORDER_DRAFT_TTL_MINUTES),
}
if (
@ -987,7 +988,7 @@ class OrderFlowMixin:
extracted["motivo"] = free_text
draft["payload"].update(extracted)
draft["expires_at"] = datetime.utcnow() + timedelta(minutes=PENDING_CANCEL_ORDER_DRAFT_TTL_MINUTES)
draft["expires_at"] = utc_now() + timedelta(minutes=PENDING_CANCEL_ORDER_DRAFT_TTL_MINUTES)
self._set_order_flow_entry(
"pending_cancel_order_drafts",
user_id,
@ -1010,7 +1011,7 @@ class OrderFlowMixin:
error = self.tool_executor.coerce_http_error(exc)
if error.get("retryable") and error.get("field"):
draft["payload"].pop(str(error["field"]), None)
draft["expires_at"] = datetime.utcnow() + timedelta(minutes=PENDING_CANCEL_ORDER_DRAFT_TTL_MINUTES)
draft["expires_at"] = utc_now() + timedelta(minutes=PENDING_CANCEL_ORDER_DRAFT_TTL_MINUTES)
self._set_order_flow_entry(
"pending_cancel_order_drafts",
user_id,

@ -1,5 +1,6 @@
import re
from datetime import datetime, timedelta
from app.core.time_utils import utc_now
from typing import TYPE_CHECKING
from app.services.orchestration.orchestrator_config import (
@ -12,6 +13,68 @@ from app.services.orchestration.orchestrator_config import (
if TYPE_CHECKING:
from app.services.orchestration.orquestrador_service import OrquestradorService
GENERIC_MEMORY_SUMMARY_FIELDS = (
"placa",
"cpf",
"orcamento_max",
"perfil_veiculo",
)
REVIEW_SUMMARY_FIELDS = (
"placa",
"data_hora",
"data_hora_base",
"modelo",
"ano",
"km",
"revisao_previa_concessionaria",
)
REVIEW_MANAGEMENT_SUMMARY_FIELDS = (
"protocolo",
"nova_data_hora",
"motivo",
)
ORDER_SUMMARY_FIELDS = (
"cpf",
"vehicle_id",
"modelo_veiculo",
"valor_veiculo",
)
CANCEL_ORDER_SUMMARY_FIELDS = (
"numero_pedido",
"motivo",
)
CONTEXT_FIELD_LABELS = {
"placa": "placa",
"cpf": "cpf",
"orcamento_max": "orcamento",
"perfil_veiculo": "perfil",
"data_hora": "data/hora",
"data_hora_base": "data base",
"modelo": "modelo",
"ano": "ano",
"km": "km",
"revisao_previa_concessionaria": "revisao previa na concessionaria",
"protocolo": "protocolo",
"nova_data_hora": "nova data/hora",
"motivo": "motivo",
"vehicle_id": "vehicle_id",
"modelo_veiculo": "modelo_veiculo",
"valor_veiculo": "valor_veiculo",
"numero_pedido": "numero_pedido",
}
ACTIVE_TASK_LABELS = {
"review_schedule": "agendamento de revisao",
"review_management": "gestao de revisao",
"order_create": "criacao de pedido",
"order_cancel": "cancelamento de pedido",
}
# essa classe é responsável por controlar qual o assunto está ativo na conversa, se existe fluxo aberto, se o usuário mandou dois pedidos ao mesmo tempo...
class ConversationPolicy:
def __init__(self, service: "OrquestradorService"):
@ -74,7 +137,7 @@ class ConversationPolicy:
"domain": domain,
"message": (order_message or "").strip(),
"memory_seed": dict(memory_seed or self.service._new_tab_memory(user_id=user_id)),
"created_at": datetime.utcnow().isoformat(),
"created_at": utc_now().isoformat(),
}
)
self._save_context(user_id=user_id, context=context)
@ -233,7 +296,7 @@ class ConversationPolicy:
}
for order in orders[:2]
],
"expires_at": datetime.utcnow() + timedelta(minutes=PENDING_ORDER_SELECTION_TTL_MINUTES),
"expires_at": utc_now() + timedelta(minutes=PENDING_ORDER_SELECTION_TTL_MINUTES),
}
self._save_context(user_id=user_id, context=context)
@ -480,7 +543,7 @@ class ConversationPolicy:
pending = context.get("pending_order_selection")
if not isinstance(pending, dict):
return None
if pending.get("expires_at") and pending["expires_at"] < datetime.utcnow():
if pending.get("expires_at") and pending["expires_at"] < utc_now():
context["pending_order_selection"] = None
self._save_context(user_id=user_id, context=context)
return None
@ -611,7 +674,7 @@ class ConversationPolicy:
"target_domain": next_order["domain"],
"queued_message": next_order["message"],
"memory_seed": dict(next_order.get("memory_seed") or self.service._new_tab_memory(user_id=user_id)),
"expires_at": datetime.utcnow() + timedelta(minutes=15),
"expires_at": utc_now() + timedelta(minutes=15),
}
self._save_context(user_id=user_id, context=context)
transition = self.build_next_order_transition(next_order["domain"])
@ -691,7 +754,7 @@ class ConversationPolicy:
pending_switch = context.get("pending_switch")
if not isinstance(pending_switch, dict):
return None
if pending_switch.get("expires_at") and pending_switch["expires_at"] < datetime.utcnow():
if pending_switch.get("expires_at") and pending_switch["expires_at"] < utc_now():
context["pending_switch"] = None
self._save_context(user_id=user_id, context=context)
return None
@ -771,7 +834,7 @@ class ConversationPolicy:
return None
pending_switch = context.get("pending_switch")
if pending_switch:
if pending_switch["expires_at"] < datetime.utcnow():
if pending_switch["expires_at"] < utc_now():
context["pending_switch"] = None
self._save_context(user_id=user_id, context=context)
elif (
@ -790,7 +853,7 @@ class ConversationPolicy:
return "Perfeito, vamos continuar no fluxo atual."
pending_order_selection = context.get("pending_order_selection")
if pending_order_selection and pending_order_selection.get("expires_at") < datetime.utcnow():
if pending_order_selection and pending_order_selection.get("expires_at") < utc_now():
context["pending_order_selection"] = None
self._save_context(user_id=user_id, context=context)
@ -808,7 +871,7 @@ class ConversationPolicy:
context["pending_switch"] = {
"source_domain": current_domain,
"target_domain": target_domain_hint,
"expires_at": datetime.utcnow() + timedelta(minutes=15),
"expires_at": utc_now() + timedelta(minutes=15),
}
self._save_context(user_id=user_id, context=context)
return self.render_context_switch_confirmation(source_domain=current_domain, target_domain=target_domain_hint)
@ -854,6 +917,79 @@ class ConversationPolicy:
f"{self.render_domain_onboarding_prompt(target_domain)}"
)
def _field_label(self, field_name: str) -> str:
return CONTEXT_FIELD_LABELS.get(field_name, field_name)
def _task_label(self, active_task: str | None) -> str:
task_key = str(active_task or "").strip().lower()
return ACTIVE_TASK_LABELS.get(task_key, str(active_task or "").strip())
def _format_summary_value(self, field_name: str, value) -> str:
if isinstance(value, bool):
return "sim" if value else "nao"
if isinstance(value, (int, float)) and field_name in {"orcamento_max", "valor_veiculo", "preco"}:
amount = float(value)
formatted = f"{amount:,.0f}" if amount.is_integer() else f"{amount:,.2f}"
formatted = formatted.replace(",", "X").replace(".", ",").replace("X", ".")
return f"R$ {formatted}"
if isinstance(value, list):
rendered_items = [
self._format_summary_value(field_name=field_name, value=item)
for item in value
if item not in (None, "", [], {})
]
if not rendered_items:
return ""
if len(rendered_items) <= 3:
return ", ".join(rendered_items)
return f"{', '.join(rendered_items[:3])}, ..."
if isinstance(value, str):
compact = value.strip()
if len(compact) > 80:
return f"{compact[:77]}..."
return compact
return str(value)
def _summarize_payload(self, payload: dict | None, field_order: tuple[str, ...]) -> str:
if not isinstance(payload, dict):
return ""
fragments: list[str] = []
for field_name in field_order:
value = payload.get(field_name)
if value in (None, "", [], {}):
continue
rendered = self._format_summary_value(field_name=field_name, value=value)
if not rendered:
continue
fragments.append(f"{self._field_label(field_name)}={rendered}")
return ", ".join(fragments)
def _summarize_missing_fields(self, payload: dict | None, required_fields: tuple[str, ...]) -> str:
if not isinstance(payload, dict):
return ", ".join(self._field_label(field_name) for field_name in required_fields)
missing_fields = [self._field_label(field_name) for field_name in required_fields if field_name not in payload]
return ", ".join(missing_fields)
def _get_pending_entry(self, user_id: int | None, bucket: str, snapshot_key: str | None = None) -> dict | None:
entry = self.service.state.get_entry(bucket, user_id, expire=True)
if entry:
return entry
if user_id is None:
return None
context = self.service._get_user_context(user_id)
if not isinstance(context, dict):
return None
snapshots = context.get("flow_snapshots")
if not isinstance(snapshots, dict) or not snapshot_key:
return None
snapshot = snapshots.get(snapshot_key)
if not isinstance(snapshot, dict):
return None
expires_at = snapshot.get("expires_at")
if isinstance(expires_at, datetime) and expires_at < utc_now():
return None
return snapshot
# Serve para depuração, observabilidade ou até para alimentar outro componente com um resumo do estado atual.
def build_context_summary(self, user_id: int | None) -> str:
@ -861,16 +997,132 @@ class ConversationPolicy:
if not context:
return "Contexto de conversa: sem contexto ativo."
summary = [f"Contexto de conversa ativo: {self.domain_label(context.get('active_domain', 'general'))}."]
active_task = str(context.get("active_task") or "").strip()
if active_task:
summary.append(f"Fluxo ativo: {self._task_label(active_task)}.")
memory = context.get("generic_memory", {})
if memory:
summary.append(f"Memoria generica temporaria: {memory}.")
summarized_memory = self._summarize_payload(memory, GENERIC_MEMORY_SUMMARY_FIELDS)
if summarized_memory:
summary.append(f"Memoria generica temporaria: {summarized_memory}.")
selected_vehicle = context.get("selected_vehicle")
if isinstance(selected_vehicle, dict) and selected_vehicle.get("modelo"):
summary.append(f"Veiculo selecionado para compra: {selected_vehicle.get('modelo')}.")
stock_results = context.get("last_stock_results") or []
if isinstance(stock_results, list) and stock_results:
summary.append(f"Ultima consulta de estoque com {len(stock_results)} opcao(oes) disponivel(is).")
last_tool_result = context.get("last_tool_result")
if isinstance(last_tool_result, dict) and last_tool_result.get("tool_name"):
tool_name = str(last_tool_result.get("tool_name") or "").strip()
result_type = str(last_tool_result.get("result_type") or "").strip()
if tool_name and result_type:
summary.append(f"Ultima tool executada: {tool_name} ({result_type}).")
elif tool_name:
summary.append(f"Ultima tool executada: {tool_name}.")
pending_order_selection = context.get("pending_order_selection")
if isinstance(pending_order_selection, dict):
orders = pending_order_selection.get("orders")
if isinstance(orders, list) and orders:
summary.append(f"Aguardando escolha entre {len(orders)} pedido(s) detectado(s) na mesma mensagem.")
pending_switch = context.get("pending_switch")
if isinstance(pending_switch, dict):
target_domain = str(pending_switch.get("target_domain") or "general")
summary.append(f"Troca de contexto pendente para {self.domain_label(target_domain)}.")
order_queue = context.get("order_queue", [])
if order_queue:
summary.append(f"Fila de pedidos pendentes: {len(order_queue)}.")
pending_single_vehicle = context.get("pending_single_vehicle_confirmation")
if isinstance(pending_single_vehicle, dict) and pending_single_vehicle.get("modelo"):
summary.append(
f"Aguardando confirmacao explicita do veiculo {pending_single_vehicle.get('modelo')}."
)
review_draft = self._get_pending_entry(user_id, "pending_review_drafts", "review_schedule")
if isinstance(review_draft, dict):
payload = review_draft.get("payload")
known_fields = self._summarize_payload(payload, REVIEW_SUMMARY_FIELDS)
missing_fields = self._summarize_missing_fields(payload, REVIEW_REQUIRED_FIELDS)
draft_summary = "Rascunho aberto de agendamento de revisao."
if known_fields:
draft_summary = f"{draft_summary} Dados atuais: {known_fields}."
if missing_fields:
draft_summary = f"{draft_summary} Faltando: {missing_fields}."
summary.append(draft_summary)
review_management_draft = self._get_pending_entry(
user_id,
"pending_review_management_drafts",
"review_management",
)
if isinstance(review_management_draft, dict):
action = str(review_management_draft.get("action") or "cancel").strip().lower()
payload = review_management_draft.get("payload")
known_fields = self._summarize_payload(payload, REVIEW_MANAGEMENT_SUMMARY_FIELDS)
if action == "reschedule":
missing_fields = self._summarize_missing_fields(payload, ("protocolo", "nova_data_hora"))
draft_summary = "Rascunho aberto de remarcacao de revisao."
else:
missing_fields = self._summarize_missing_fields(payload, ("protocolo",))
draft_summary = "Rascunho aberto de cancelamento de revisao."
if known_fields:
draft_summary = f"{draft_summary} Dados atuais: {known_fields}."
if missing_fields:
draft_summary = f"{draft_summary} Faltando: {missing_fields}."
summary.append(draft_summary)
review_confirmation = self._get_pending_entry(
user_id,
"pending_review_confirmations",
"review_confirmation",
)
if isinstance(review_confirmation, dict):
payload = review_confirmation.get("payload")
known_fields = self._summarize_payload(payload, REVIEW_SUMMARY_FIELDS)
confirmation_summary = "Confirmacao pendente de horario sugerido para revisao."
if known_fields:
confirmation_summary = f"{confirmation_summary} Dados sugeridos: {known_fields}."
summary.append(confirmation_summary)
review_reuse = self._get_pending_entry(
user_id,
"pending_review_reuse_confirmations",
"review_reuse_confirmation",
)
if isinstance(review_reuse, dict):
payload = review_reuse.get("payload")
known_fields = self._summarize_payload(payload, REVIEW_SUMMARY_FIELDS)
reuse_summary = "Confirmacao pendente para reutilizar dados da ultima revisao."
if known_fields:
reuse_summary = f"{reuse_summary} Dados reaproveitaveis: {known_fields}."
summary.append(reuse_summary)
order_draft = self._get_pending_entry(user_id, "pending_order_drafts", "order_create")
if isinstance(order_draft, dict):
payload = order_draft.get("payload")
known_fields = self._summarize_payload(payload, ORDER_SUMMARY_FIELDS)
missing_fields = self._summarize_missing_fields(payload, ORDER_REQUIRED_FIELDS)
draft_summary = "Rascunho aberto de criacao de pedido."
if known_fields:
draft_summary = f"{draft_summary} Dados atuais: {known_fields}."
if missing_fields:
draft_summary = f"{draft_summary} Faltando: {missing_fields}."
summary.append(draft_summary)
cancel_order_draft = self._get_pending_entry(user_id, "pending_cancel_order_drafts", "order_cancel")
if isinstance(cancel_order_draft, dict):
payload = cancel_order_draft.get("payload")
known_fields = self._summarize_payload(payload, CANCEL_ORDER_SUMMARY_FIELDS)
missing_fields = self._summarize_missing_fields(payload, CANCEL_ORDER_REQUIRED_FIELDS)
draft_summary = "Rascunho aberto de cancelamento de pedido."
if known_fields:
draft_summary = f"{draft_summary} Dados atuais: {known_fields}."
if missing_fields:
draft_summary = f"{draft_summary} Faltando: {missing_fields}."
summary.append(draft_summary)
stock_selection = self._get_pending_entry(user_id, "pending_stock_selections")
if isinstance(stock_selection, dict):
payload = stock_selection.get("payload")
if isinstance(payload, list) and payload:
summary.append(f"Aguardando escolha de veiculo em {len(payload)} opcao(oes) de estoque.")
return " ".join(summary)

@ -1,4 +1,5 @@
from datetime import datetime, timedelta
from app.core.time_utils import utc_now
from app.services.orchestration.conversation_state_repository import ConversationStateRepository
@ -20,7 +21,7 @@ class ConversationStateStore(ConversationStateRepository):
def upsert_user_context(self, user_id: int | None, ttl_minutes: int) -> None:
if user_id is None:
return
now = datetime.utcnow()
now = utc_now()
context = self.user_contexts.get(user_id)
if context and context["expires_at"] >= now:
context["expires_at"] = now + timedelta(minutes=ttl_minutes)
@ -47,7 +48,7 @@ class ConversationStateStore(ConversationStateRepository):
context = self.user_contexts.get(user_id)
if not context:
return None
if context["expires_at"] < datetime.utcnow():
if context["expires_at"] < utc_now():
self.user_contexts.pop(user_id, None)
return None
return context
@ -64,7 +65,7 @@ class ConversationStateStore(ConversationStateRepository):
entry = entries.get(user_id)
if not entry:
return None
if expire and entry.get("expires_at") and entry["expires_at"] < datetime.utcnow():
if expire and entry.get("expires_at") and entry["expires_at"] < utc_now():
entries.pop(user_id, None)
return None
return entry

@ -1,5 +1,6 @@
import json
from datetime import datetime, timedelta
from app.core.time_utils import utc_now
from typing import Any
from redis import Redis
@ -23,7 +24,7 @@ class RedisConversationStateRepository(ConversationStateRepository):
if user_id is None:
return
now = datetime.utcnow()
now = utc_now()
key = self._bucket_key("user_contexts", user_id)
context = self._load(key)
@ -68,7 +69,7 @@ class RedisConversationStateRepository(ConversationStateRepository):
payload = dict(context)
ttl_seconds = self._ttl_from_entry(payload)
if ttl_seconds is None:
payload["expires_at"] = datetime.utcnow().replace(microsecond=0) + self._minutes_delta(self.default_ttl_minutes)
payload["expires_at"] = utc_now().replace(microsecond=0) + self._minutes_delta(self.default_ttl_minutes)
ttl_seconds = self.default_ttl_minutes * 60
self._save(self._bucket_key("user_contexts", user_id), payload, ttl_seconds=ttl_seconds)
@ -117,13 +118,13 @@ class RedisConversationStateRepository(ConversationStateRepository):
expires_at = entry.get("expires_at")
if not isinstance(expires_at, datetime):
return True
return expires_at >= (now or datetime.utcnow())
return expires_at >= (now or utc_now())
def _ttl_from_entry(self, entry: dict) -> int | None:
expires_at = entry.get("expires_at")
if not isinstance(expires_at, datetime):
return None
delta = int((expires_at - datetime.utcnow()).total_seconds())
delta = int((expires_at - utc_now()).total_seconds())
return max(1, delta)
def _minutes_delta(self, minutes: int):

@ -0,0 +1,205 @@
import os
import unittest
from datetime import datetime, timedelta
from app.core.time_utils import utc_now
os.environ.setdefault("DEBUG", "false")
from app.services.orchestration.conversation_policy import ConversationPolicy
from app.services.orchestration.entity_normalizer import EntityNormalizer
class FakeState:
def __init__(self, entries=None, contexts=None):
self.entries = entries or {}
self.contexts = contexts or {}
def get_entry(self, bucket: str, user_id: int | None, *, expire: bool = False):
if user_id is None:
return None
return self.entries.get(bucket, {}).get(user_id)
def get_user_context(self, user_id: int | None):
if user_id is None:
return None
return self.contexts.get(user_id)
def save_user_context(self, user_id: int | None, context: dict):
if user_id is None:
return
self.contexts[user_id] = context
class FakeService:
def __init__(self, state):
self.state = state
self.normalizer = EntityNormalizer()
def _get_user_context(self, user_id: int | None):
return self.state.get_user_context(user_id)
def _save_user_context(self, user_id: int | None, context: dict | None) -> None:
if user_id is None or not isinstance(context, dict):
return
self.state.save_user_context(user_id, context)
class ContextSummaryTests(unittest.TestCase):
def test_build_context_summary_describes_open_review_flow(self):
now = utc_now()
state = FakeState(
entries={
"pending_review_drafts": {
21: {
"payload": {
"placa": "ABC1234",
"modelo": "Onix",
"ano": 2024,
},
"expires_at": now + timedelta(minutes=15),
}
},
"pending_review_confirmations": {
21: {
"payload": {
"placa": "ABC1234",
"data_hora": "14/03/2026 16:30",
},
"expires_at": now + timedelta(minutes=15),
}
},
},
contexts={
21: {
"active_domain": "review",
"active_task": "review_schedule",
"generic_memory": {
"placa": "ABC1234",
"orcamento_max": 70000,
"perfil_veiculo": ["suv"],
},
"shared_memory": {},
"order_queue": [{"domain": "sales", "message": "quero comprar um carro"}],
"pending_order_selection": {
"orders": [
{"domain": "review", "message": "agendar revisao"},
{"domain": "sales", "message": "comprar um carro"},
]
},
"pending_switch": {"target_domain": "sales"},
"last_stock_results": [],
"selected_vehicle": None,
"last_tool_result": {"tool_name": "listar_agendamentos_revisao", "result_type": "list"},
}
},
)
summary = ConversationPolicy(service=FakeService(state)).build_context_summary(21)
self.assertIn("Fluxo ativo: agendamento de revisao.", summary)
self.assertIn("Memoria generica temporaria: placa=ABC1234, orcamento=R$ 70.000, perfil=suv.", summary)
self.assertIn("Ultima tool executada: listar_agendamentos_revisao (list).", summary)
self.assertIn("Aguardando escolha entre 2 pedido(s) detectado(s) na mesma mensagem.", summary)
self.assertIn("Troca de contexto pendente para compra de veiculo.", summary)
self.assertIn("Fila de pedidos pendentes: 1.", summary)
self.assertIn("Rascunho aberto de agendamento de revisao.", summary)
self.assertIn("Dados atuais: placa=ABC1234, modelo=Onix, ano=2024.", summary)
self.assertIn("Faltando: data/hora, km, revisao previa na concessionaria.", summary)
self.assertIn("Confirmacao pendente de horario sugerido para revisao.", summary)
self.assertIn("Dados sugeridos: placa=ABC1234, data/hora=14/03/2026 16:30.", summary)
def test_build_context_summary_describes_open_order_flow(self):
now = utc_now()
state = FakeState(
entries={
"pending_order_drafts": {
10: {
"payload": {"cpf": "12345678909"},
"expires_at": now + timedelta(minutes=15),
}
},
"pending_stock_selections": {
10: {
"payload": [
{
"id": 1,
"modelo": "Honda Civic 2021",
"categoria": "sedan",
"preco": 48500.0,
}
],
"expires_at": now + timedelta(minutes=15),
}
},
},
contexts={
10: {
"active_domain": "sales",
"active_task": "order_create",
"generic_memory": {
"cpf": "12345678909",
"orcamento_max": 80000,
},
"shared_memory": {},
"order_queue": [],
"pending_order_selection": None,
"pending_switch": None,
"last_stock_results": [
{"id": 1, "modelo": "Honda Civic 2021", "categoria": "sedan", "preco": 48500.0},
{"id": 2, "modelo": "Toyota Yaris 2020", "categoria": "hatch", "preco": 49900.0},
],
"selected_vehicle": {"id": 1, "modelo": "Honda Civic 2021"},
"pending_single_vehicle_confirmation": {"id": 1, "modelo": "Honda Civic 2021"},
"last_tool_result": {"tool_name": "consultar_estoque", "result_type": "list"},
}
},
)
summary = ConversationPolicy(service=FakeService(state)).build_context_summary(10)
self.assertIn("Fluxo ativo: criacao de pedido.", summary)
self.assertIn("Memoria generica temporaria: cpf=12345678909, orcamento=R$ 80.000.", summary)
self.assertIn("Veiculo selecionado para compra: Honda Civic 2021.", summary)
self.assertIn("Ultima consulta de estoque com 2 opcao(oes) disponivel(is).", summary)
self.assertIn("Ultima tool executada: consultar_estoque (list).", summary)
self.assertIn("Aguardando confirmacao explicita do veiculo Honda Civic 2021.", summary)
self.assertIn("Rascunho aberto de criacao de pedido.", summary)
self.assertIn("Dados atuais: cpf=12345678909.", summary)
self.assertIn("Faltando: vehicle_id.", summary)
self.assertIn("Aguardando escolha de veiculo em 1 opcao(oes) de estoque.", summary)
def test_build_context_summary_uses_snapshot_when_bucket_is_missing(self):
now = utc_now()
state = FakeState(
contexts={
7: {
"active_domain": "review",
"active_task": "review_schedule",
"generic_memory": {"placa": "ABC1C23"},
"shared_memory": {},
"flow_snapshots": {
"review_schedule": {
"payload": {
"placa": "ABC1C23",
"modelo": "Onix",
"ano": 2024,
},
"expires_at": now + timedelta(minutes=15),
}
},
"order_queue": [],
"pending_order_selection": None,
"pending_switch": None,
"last_stock_results": [],
"selected_vehicle": None,
}
}
)
summary = ConversationPolicy(service=FakeService(state)).build_context_summary(7)
self.assertIn("Fluxo ativo: agendamento de revisao.", summary)
self.assertIn("Rascunho aberto de agendamento de revisao.", summary)
self.assertIn("Dados atuais: placa=ABC1C23, modelo=Onix, ano=2024.", summary)
self.assertIn("Faltando: data/hora, km, revisao previa na concessionaria.", summary)
if __name__ == "__main__":
unittest.main()
Loading…
Cancel
Save