Merge branch 'chore/observability-latency-markers'

main
commit a692a7023e

@ -11,6 +11,7 @@ class Settings(BaseSettings):
google_project_id: str
google_location: str = "us-central1"
vertex_model_name: str = "gemini-2.5-pro"
vertex_bundle_model_name: str = "gemini-2.5-pro"
# Tools database (MySQL)
db_host: str = "127.0.0.1"

@ -3,6 +3,7 @@ import logging
import os
import tempfile
from datetime import timedelta
from time import perf_counter
from typing import Any, Dict, List
import aiohttp
@ -153,6 +154,12 @@ class TelegramSatelliteService:
self._chat_workers_lock = asyncio.Lock()
self._chat_processing_semaphore = asyncio.Semaphore(TELEGRAM_MAX_CONCURRENT_CHATS)
def _log_telegram_event(self, event: str, **payload) -> None:
logger.info("telegram_event=%s payload=%s", event, mask_sensitive_payload(payload))
def _elapsed_ms(self, started_at: float) -> float:
return round((perf_counter() - started_at) * 1000, 2)
async def run(self) -> None:
"""Inicia loop de long polling para consumir atualizacoes do bot."""
logger.info("Telegram satellite iniciado com long polling.")
@ -292,9 +299,19 @@ class TelegramSatelliteService:
if queue is None:
queue = asyncio.Queue()
self._chat_queues[chat_id] = queue
update["_orq_enqueued_at_perf"] = perf_counter()
queue.put_nowait(update)
queue_size = queue.qsize()
worker = self._chat_workers.get(chat_id)
worker_active = worker is not None and not worker.done()
self._log_telegram_event(
"chat_update_enqueued",
chat_id=chat_id,
update_id=update.get("update_id"),
queue_size=queue_size,
worker_active=worker_active,
)
if worker is None or worker.done():
self._chat_workers[chat_id] = asyncio.create_task(
self._run_chat_worker(
@ -315,6 +332,19 @@ class TelegramSatelliteService:
try:
while True:
update = await queue.get()
queued_at_perf = update.get("_orq_enqueued_at_perf")
queue_wait_ms = (
round((perf_counter() - queued_at_perf) * 1000, 2)
if isinstance(queued_at_perf, (int, float))
else None
)
self._log_telegram_event(
"chat_update_dequeued",
chat_id=chat_id,
update_id=update.get("update_id"),
queue_wait_ms=queue_wait_ms,
queue_size=queue.qsize(),
)
try:
async with self._chat_processing_semaphore:
await self._handle_update(session=session, update=update)
@ -410,12 +440,28 @@ class TelegramSatelliteService:
if offset is not None:
payload["offset"] = offset
started_at = perf_counter()
async with session.post(f"{self.base_url}/getUpdates", json=payload) as response:
data = await response.json()
if not data.get("ok"):
self._log_telegram_event(
"get_updates_failed",
offset=offset,
elapsed_ms=self._elapsed_ms(started_at),
response=data,
)
logger.warning("Falha em getUpdates: %s", data)
return []
return data.get("result", [])
updates = data.get("result", [])
if updates:
self._log_telegram_event(
"get_updates_completed",
offset=offset,
updates_count=len(updates),
elapsed_ms=self._elapsed_ms(started_at),
)
return updates
async def _handle_update(
self,
@ -423,11 +469,13 @@ class TelegramSatelliteService:
update: Dict[str, Any],
) -> None:
"""Processa uma atualizacao recebida e envia resposta ao chat."""
started_at = perf_counter()
message = update.get("message", {})
text = message.get("text") or message.get("caption")
chat = message.get("chat", {})
chat_id = chat.get("id")
sender = message.get("from", {})
update_id = update.get("update_id")
if not chat_id:
return
@ -441,11 +489,29 @@ class TelegramSatelliteService:
self._build_update_idempotency_key(update),
)
await self._deliver_message(session=session, chat_id=chat_id, text=cached_answer)
self._log_telegram_event(
"update_completed",
update_id=update_id,
chat_id=chat_id,
cached_hit=True,
elapsed_ms=self._elapsed_ms(started_at),
input_chars=len(str(text or "")),
answer_chars=len(cached_answer),
image_count=0,
)
return
image_attachments = await self._extract_image_attachments(session=session, message=message)
image_count = len(image_attachments)
if not text and not image_attachments:
self._log_telegram_event(
"update_ignored",
update_id=update_id,
chat_id=chat_id,
reason="empty_text_and_no_image",
elapsed_ms=self._elapsed_ms(started_at),
)
return
try:
@ -463,10 +529,19 @@ class TelegramSatelliteService:
answer = "Nao consegui processar sua solicitacao agora. Tente novamente em instantes."
self._store_processed_update(update=update, answer=answer)
update_id = update.get("update_id")
if isinstance(update_id, int):
self._persist_last_processed_update_id(update_id)
await self._deliver_message(session=session, chat_id=chat_id, text=answer)
self._log_telegram_event(
"update_completed",
update_id=update_id,
chat_id=chat_id,
cached_hit=False,
elapsed_ms=self._elapsed_ms(started_at),
input_chars=len(str(text or "")),
answer_chars=len(str(answer or "")),
image_count=image_count,
)
async def _deliver_message(
self,
@ -488,17 +563,24 @@ class TelegramSatelliteService:
text: str,
) -> None:
"""Envia mensagem de texto para o chat informado no Telegram."""
for chunk_index, chunk in enumerate(_split_telegram_text(text), start=1):
chunks = _split_telegram_text(text)
started_at = perf_counter()
total_attempts = 0
successful_chunks = 0
for chunk_index, chunk in enumerate(chunks, start=1):
payload = {
"chat_id": chat_id,
"text": chunk,
}
for attempt in range(1, TELEGRAM_SEND_MESSAGE_MAX_ATTEMPTS + 1):
total_attempts += 1
try:
async with session.post(f"{self.base_url}/sendMessage", json=payload) as response:
data = await response.json()
if not data.get("ok"):
logger.warning("Falha em sendMessage: %s", data)
else:
successful_chunks += 1
break
except (aiohttp.ClientError, asyncio.TimeoutError, OSError) as exc:
if attempt >= TELEGRAM_SEND_MESSAGE_MAX_ATTEMPTS:
@ -522,6 +604,16 @@ class TelegramSatelliteService:
)
await asyncio.sleep(delay_seconds)
self._log_telegram_event(
"send_message_completed",
chat_id=chat_id,
chunk_count=len(chunks),
successful_chunks=successful_chunks,
total_attempts=total_attempts,
elapsed_ms=self._elapsed_ms(started_at),
text_chars=len(str(text or "")),
)
# Processa uma mensagem do Telegram e injeta o texto extraido de imagens quando houver.
async def _process_message(
self,
@ -531,22 +623,49 @@ class TelegramSatelliteService:
image_attachments: List[Dict[str, Any]] | None = None,
) -> str:
"""Encaminha mensagem ao orquestrador com usuario identificado e retorna resposta."""
started_at = perf_counter()
message_text = text
image_processing_ms = None
if image_attachments:
image_started_at = perf_counter()
image_message = await self._build_orchestration_message_from_image(
caption=text,
image_attachments=image_attachments,
)
image_processing_ms = self._elapsed_ms(image_started_at)
if self._is_image_analysis_failure_message(image_message):
self._log_telegram_event(
"process_message_completed",
chat_id=chat_id,
elapsed_ms=self._elapsed_ms(started_at),
image_processing_ms=image_processing_ms,
orchestration_offloaded=False,
used_image_attachments=True,
input_chars=len(str(text or "")),
response_chars=len(image_message),
)
return image_message
message_text = image_message
return await asyncio.to_thread(
orchestration_started_at = perf_counter()
answer = await asyncio.to_thread(
self._run_blocking_orchestration_turn,
message_text=message_text,
sender=sender,
chat_id=chat_id,
)
self._log_telegram_event(
"process_message_completed",
chat_id=chat_id,
elapsed_ms=self._elapsed_ms(started_at),
image_processing_ms=image_processing_ms,
orchestration_ms=self._elapsed_ms(orchestration_started_at),
orchestration_offloaded=True,
used_image_attachments=bool(image_attachments),
input_chars=len(message_text),
response_chars=len(str(answer or "")),
)
return answer
def _run_blocking_orchestration_turn(
self,
@ -559,9 +678,11 @@ class TelegramSatelliteService:
Executa o turno do orquestrador fora do loop async principal.
Isso isola sessoes SQLAlchemy sincronas e outras operacoes bloqueantes.
"""
started_at = perf_counter()
tools_db = SessionLocal()
mock_db = SessionMockLocal()
try:
user_resolution_started_at = perf_counter()
user_service = UserService(mock_db)
external_id = str(sender.get("id") or chat_id)
first_name = (sender.get("first_name") or "").strip()
@ -575,12 +696,30 @@ class TelegramSatelliteService:
name=display_name,
username=username,
)
user_resolution_ms = self._elapsed_ms(user_resolution_started_at)
service_init_started_at = perf_counter()
service = OrquestradorService(
tools_db,
state_repository=self.state,
)
return asyncio.run(service.handle_message(message=message_text, user_id=user.id))
service_init_ms = self._elapsed_ms(service_init_started_at)
orchestration_started_at = perf_counter()
response = asyncio.run(service.handle_message(message=message_text, user_id=user.id))
orchestration_ms = self._elapsed_ms(orchestration_started_at)
self._log_telegram_event(
"blocking_turn_completed",
chat_id=chat_id,
user_id=user.id,
elapsed_ms=self._elapsed_ms(started_at),
user_resolution_ms=user_resolution_ms,
service_init_ms=service_init_ms,
orchestration_ms=orchestration_ms,
input_chars=len(message_text),
response_chars=len(str(response or "")),
)
return response
finally:
tools_db.close()
mock_db.close()

@ -1,5 +1,7 @@
import asyncio
import json
import logging
from time import perf_counter
from typing import Dict, Any, List, Optional
import vertexai
@ -9,6 +11,8 @@ from vertexai.generative_models import FunctionDeclaration, GenerativeModel, Par
from app.core.settings import settings
from app.models.tool_model import ToolDefinition
logger = logging.getLogger(__name__)
IMAGE_ANALYSIS_FAILURE_MESSAGE = "Nao consegui identificar os dados da imagem. Descreva o documento ou envie uma foto mais nitida."
INVALID_RECEIPT_WATERMARK_MESSAGE = "O comprovante enviado nao e valido. Envie um comprovante valido com a marca d'agua SysaltiIA visivel."
VALID_RECEIPT_WATERMARK_MARKER = "[watermark_sysaltiia_ok]"
@ -35,7 +39,14 @@ class LLMService:
configured = settings.vertex_model_name.strip()
fallback_models = ["gemini-2.5-pro", "gemini-2.5-flash", "gemini-2.0-flash-001"]
self.model_names = [configured] + [m for m in fallback_models if m != configured]
self.model_names = self._build_model_sequence(configured, *fallback_models)
self.bundle_model_names = self._build_model_sequence(
settings.vertex_bundle_model_name.strip(),
*self.model_names,
)
def _log_llm_event(self, event: str, **payload) -> None:
logger.info("llm_service_event=%s payload=%s", event, payload)
# Transforma anexos de imagem em uma mensagem textual pronta para o orquestrador.
async def extract_image_workflow_message(
@ -63,10 +74,15 @@ class LLMService:
response = None
last_error = None
selected_model_name = None
attempts = 0
started_at = perf_counter()
for model_name in self.model_names:
attempts += 1
try:
model = self._get_model(model_name)
response = await asyncio.to_thread(model.generate_content, contents)
selected_model_name = model_name
break
except NotFound as err:
last_error = err
@ -74,6 +90,13 @@ class LLMService:
continue
if response is None:
self._log_llm_event(
"image_workflow_failed",
elapsed_ms=round((perf_counter() - started_at) * 1000, 2),
attempts=attempts,
attachments_count=len(attachments),
caption_present=bool(str(caption or "").strip()),
)
if last_error:
raise RuntimeError(
f"Nenhum modelo Vertex disponivel para analise de imagem. Erro: {last_error}"
@ -81,6 +104,14 @@ class LLMService:
raise RuntimeError("Falha ao analisar imagem no Vertex AI.")
payload = self._extract_response_payload(response)
self._log_llm_event(
"image_workflow_completed",
model_name=selected_model_name,
elapsed_ms=round((perf_counter() - started_at) * 1000, 2),
attempts=attempts,
attachments_count=len(attachments),
caption_present=bool(str(caption or "").strip()),
)
extracted_text = (payload.get("response") or "").strip() or (caption or "").strip()
return self._coerce_image_workflow_response(extracted_text)
@ -179,6 +210,14 @@ class LLMService:
LLMService._models[model_name] = model
return model
def _build_model_sequence(self, *model_names: str | None) -> list[str]:
sequence: list[str] = []
for item in model_names:
candidate = str(item or "").strip()
if candidate and candidate not in sequence:
sequence.append(candidate)
return sequence
def _extract_response_payload(self, response) -> Dict[str, Any]:
candidate = response.candidates[0] if getattr(response, "candidates", None) else None
content = getattr(candidate, "content", None)
@ -197,7 +236,22 @@ class LLMService:
if isinstance(text_value, str) and text_value.strip():
text_parts.append(text_value)
response_text = "\n".join(text_parts).strip() or None
response_text = "\n".join(text_parts).strip()
if not response_text:
fallback_text = None
for carrier in (response, candidate, content):
if carrier is None:
continue
try:
text_value = getattr(carrier, "text", None)
except (AttributeError, ValueError):
text_value = None
if isinstance(text_value, str) and text_value.strip():
fallback_text = text_value.strip()
break
response_text = fallback_text or None
else:
response_text = response_text or None
return {
"response": response_text,
"tool_call": tool_call,
@ -208,21 +262,34 @@ class LLMService:
message: str,
tools: List[ToolDefinition],
history: List[Dict[str, Any]] = None,
preferred_models: List[str] | None = None,
generation_config: Dict[str, Any] | None = None,
) -> Dict[str, Any]:
"""Gera resposta textual ou chamada de tool a partir da mensagem do usuario."""
vertex_tools = self.build_vertex_tools(tools)
candidate_models = self._build_model_sequence(*(preferred_models or []), *self.model_names)
response = None
last_error = None
selected_model_name = None
attempts = 0
started_at = perf_counter()
# Tenta o modelo configurado e cai para nomes alternativos
# quando o principal nao estiver disponivel no projeto/regiao.
for model_name in self.model_names:
for model_name in candidate_models:
attempts += 1
try:
model = self._get_model(model_name)
chat = model.start_chat(history=history or [])
send_kwargs = {"tools": vertex_tools} if vertex_tools else {}
if generation_config:
send_kwargs["generation_config"] = generation_config
if history:
chat = model.start_chat(history=history)
response = await asyncio.to_thread(chat.send_message, message, **send_kwargs)
else:
response = await asyncio.to_thread(model.generate_content, message, **send_kwargs)
selected_model_name = model_name
break
except NotFound as err:
last_error = err
@ -230,13 +297,30 @@ class LLMService:
continue
if response is None:
self._log_llm_event(
"generate_response_failed",
elapsed_ms=round((perf_counter() - started_at) * 1000, 2),
attempts=attempts,
tools_count=len(tools or []),
history_count=len(history or []),
)
if last_error:
raise RuntimeError(
f"Nenhum modelo Vertex disponivel. Verifique VERTEX_MODEL_NAME e acesso no projeto. Erro: {last_error}"
) from last_error
raise RuntimeError("Falha ao gerar resposta no Vertex AI.")
return self._extract_response_payload(response)
payload = self._extract_response_payload(response)
self._log_llm_event(
"generate_response_completed",
model_name=selected_model_name,
elapsed_ms=round((perf_counter() - started_at) * 1000, 2),
attempts=attempts,
tools_count=len(tools or []),
history_count=len(history or []),
tool_call=bool(payload.get("tool_call")),
)
return payload
async def warmup(self) -> None:
"""Preaquece conexao/modelo para reduzir latencia da primeira requisicao real."""
@ -248,3 +332,5 @@ class LLMService:
except Exception:
# Warmup e melhor esforco; falhas nao devem bloquear inicializacao.
return

@ -182,7 +182,7 @@ class OrderFlowMixin:
return
cpf = extract_cpf_from_text(message)
if cpf and self._is_valid_cpf(cpf):
payload["cpf"] = cpf
self._set_order_cpf(payload=payload, cpf=cpf, confirmed=True)
def _try_capture_order_budget_from_message(self, user_id: int | None, message: str, payload: dict) -> None:
if not self._has_explicit_order_request(message) and self.state.get_entry("pending_order_drafts", user_id, expire=True) is None:
@ -211,7 +211,7 @@ class OrderFlowMixin:
memory = context.get("generic_memory", {})
cpf = memory.get("cpf")
if isinstance(cpf, str) and self._is_valid_cpf(cpf):
payload["cpf"] = cpf
self._set_order_cpf(payload=payload, cpf=cpf, confirmed=False, source="memory")
def _try_prefill_order_cpf_from_user_profile(self, user_id: int | None, payload: dict) -> None:
if user_id is None or payload.get("cpf"):
@ -221,10 +221,134 @@ class OrderFlowMixin:
try:
user = db.query(User).filter(User.id == user_id).first()
if user and isinstance(user.cpf, str) and self._is_valid_cpf(user.cpf):
payload["cpf"] = user.cpf
self._set_order_cpf(payload=payload, cpf=user.cpf, confirmed=False, source="user_profile")
finally:
db.close()
def _set_order_cpf(
self,
payload: dict,
cpf: str,
*,
confirmed: bool,
source: str | None = None,
) -> None:
if not isinstance(payload, dict):
return
payload["cpf"] = str(cpf)
payload["cpf_confirmed"] = bool(confirmed)
if confirmed:
payload.pop("cpf_confirmation_source", None)
return
if source:
payload["cpf_confirmation_source"] = str(source)
def _clear_order_cpf_confirmation_state(self, payload: dict) -> None:
if not isinstance(payload, dict):
return
payload.pop("cpf_confirmed", None)
payload.pop("cpf_confirmation_source", None)
def _mask_order_cpf(self, cpf: str) -> str:
digits = re.sub(r"\D", "", str(cpf or ""))
if len(digits) != 11:
return str(cpf or "").strip()
return f"{digits[:3]}.***.***-{digits[-2:]}"
def _render_known_order_cpf_confirmation_prompt(self, cpf: str) -> str:
return (
f"Encontrei um CPF informado anteriormente: {self._mask_order_cpf(cpf)}.\n"
"Ele continua correto para concluir o pedido? Responda com sim ou nao."
)
def _has_order_vehicle_search_criteria(self, message: str, payload: dict | None = None) -> bool:
if extract_budget_from_text(message) is not None:
return True
normalized_payload = payload if isinstance(payload, dict) else {}
if normalized_payload.get("vehicle_id") or normalized_payload.get("modelo_veiculo"):
return True
normalized_message = self._normalize_text(message).strip()
category_terms = {
"suv",
"sedan",
"hatch",
"pickup",
"picape",
"utilitario",
"utilitario esportivo",
}
if any(term in normalized_message for term in category_terms):
return True
return bool(self._extract_vehicle_reference_tokens(message))
def _clear_order_search_memory(self, user_id: int | None) -> None:
if user_id is None:
return
context = self._get_user_context(user_id)
if not isinstance(context, dict):
return
generic_memory = context.get("generic_memory")
if isinstance(generic_memory, dict):
generic_memory.pop("orcamento_max", None)
generic_memory.pop("perfil_veiculo", None)
shared_memory = context.get("shared_memory")
if isinstance(shared_memory, dict):
shared_memory.pop("orcamento_max", None)
shared_memory.pop("perfil_veiculo", None)
self._save_user_context(user_id=user_id, context=context)
def _remember_order_cpf_in_context(self, user_id: int | None, cpf: str) -> None:
if user_id is None or not self._is_valid_cpf(cpf):
return
context = self._get_user_context(user_id)
if not isinstance(context, dict):
return
generic_memory = context.get("generic_memory")
if not isinstance(generic_memory, dict):
generic_memory = {}
context["generic_memory"] = generic_memory
generic_memory["cpf"] = str(cpf)
context.setdefault("shared_memory", {})["cpf"] = str(cpf)
self._save_user_context(user_id=user_id, context=context)
def _try_handle_known_order_cpf_confirmation(
self,
message: str,
payload: dict,
) -> str | None:
if not isinstance(payload, dict):
return None
cpf_value = payload.get("cpf")
if not cpf_value or payload.get("cpf_confirmed", True) is not False:
return None
if not payload.get("vehicle_id"):
return None
cpf_attempt = self._extract_order_cpf_attempt(message)
if cpf_attempt and not self._is_valid_cpf(cpf_attempt):
return "Para seguir com o pedido, preciso de um CPF valido. Pode me informar novamente?"
cpf_from_message = extract_cpf_from_text(message)
if cpf_from_message and self._is_valid_cpf(cpf_from_message):
self._set_order_cpf(payload=payload, cpf=cpf_from_message, confirmed=True)
return None
if self._is_affirmative_message(message):
payload["cpf_confirmed"] = True
payload.pop("cpf_confirmation_source", None)
return None
if self._is_negative_message(message):
payload.pop("cpf", None)
self._clear_order_cpf_confirmation_state(payload)
return "Sem problema. Me informe o CPF correto para eu concluir o pedido."
return self._render_known_order_cpf_confirmation_prompt(str(cpf_value))
def _get_last_stock_results(self, user_id: int | None) -> list[dict]:
return self._order_flow_state_support.get_last_stock_results(user_id=user_id)
@ -629,10 +753,7 @@ class OrderFlowMixin:
def _render_missing_order_fields_prompt(self, missing_fields: list[str]) -> str:
if missing_fields == ["vehicle_id"]:
return (
"Para seguir com o pedido, me diga qual carro voce procura.\n"
"Se preferir, posso listar opcoes por faixa de preco, modelo ou tipo de carro."
)
return "Pode me dizer a faixa de preco, o modelo ou o tipo de carro que voce procura."
labels = {
"cpf": "o CPF do cliente",
"vehicle_id": "qual veiculo do estoque voce quer comprar",
@ -815,6 +936,24 @@ class OrderFlowMixin:
"expires_at": utc_now() + timedelta(minutes=PENDING_ORDER_DRAFT_TTL_MINUTES),
}
if (
draft.get("payload") == {}
and explicit_order_request
and not should_bootstrap_from_context
and not self._has_order_vehicle_search_criteria(message=message, payload=extracted)
):
self._reset_order_stock_context(user_id=user_id)
self._clear_order_search_memory(user_id=user_id)
draft["expires_at"] = utc_now() + timedelta(minutes=PENDING_ORDER_DRAFT_TTL_MINUTES)
self._set_order_flow_entry(
"pending_order_drafts",
user_id,
"order_create",
draft,
active_task="order_create",
)
return self._render_missing_order_fields_prompt(["vehicle_id"])
draft["payload"].update(extracted)
self._try_capture_order_cpf_from_message(message=message, payload=draft["payload"])
cpf_attempt = self._extract_order_cpf_attempt(message)
@ -877,9 +1016,25 @@ class OrderFlowMixin:
self._store_selected_vehicle(user_id=user_id, vehicle=resolved_vehicle)
draft["payload"].update(self._vehicle_to_payload(resolved_vehicle))
cpf_confirmation_response = self._try_handle_known_order_cpf_confirmation(
message=message,
payload=draft["payload"],
)
if cpf_confirmation_response:
draft["expires_at"] = utc_now() + timedelta(minutes=PENDING_ORDER_DRAFT_TTL_MINUTES)
self._set_order_flow_entry(
"pending_order_drafts",
user_id,
"order_create",
draft,
active_task="order_create",
)
return cpf_confirmation_response
cpf_value = draft["payload"].get("cpf")
if cpf_value and not self._is_valid_cpf(str(cpf_value)):
draft["payload"].pop("cpf", None)
self._clear_order_cpf_confirmation_state(draft["payload"])
self._set_order_flow_entry(
"pending_order_drafts",
user_id,
@ -894,8 +1049,10 @@ class OrderFlowMixin:
cpf=str(cpf_value),
user_id=user_id,
)
self._remember_order_cpf_in_context(user_id=user_id, cpf=str(cpf_value))
except ValueError as exc:
draft["payload"].pop("cpf", None)
self._clear_order_cpf_confirmation_state(draft["payload"])
self._set_order_flow_entry(
"pending_order_drafts",
user_id,

@ -21,6 +21,24 @@ class RentalFlowMixin:
setattr(self, "__rental_flow_state_support", support)
return support
def _rental_now(self) -> datetime:
provider = getattr(self, "_rental_now_provider", None)
if callable(provider):
return provider()
return datetime.now()
# Corrige variacoes corrompidas comuns de datas relativas vindas de canais externos.
def _normalize_rental_relative_text(self, text: str) -> str:
normalized = technical_normalizer.normalize_text(text)
replacements = (
(r"depois\s+de\s+amanh\?", "depois de amanha"),
(r"amanh\?", "amanha"),
(r"hoj\?", "hoje"),
(r"\bat\?\b", "ate"),
)
for pattern, replacement in replacements:
normalized = re.sub(pattern, replacement, normalized)
return normalized
# Sanitiza resultados da frota antes de guardar no contexto.
def _sanitize_rental_results(self, rental_results: list[dict] | None) -> list[dict]:
return self._rental_flow_state_support.sanitize_rental_results(rental_results)
@ -37,12 +55,50 @@ class RentalFlowMixin:
return self._rental_flow_state_support.get_last_rental_results(user_id=user_id)
# Guarda a lista atual para permitir selecao do veiculo em mensagens seguintes.
def _store_pending_rental_selection(self, user_id: int | None, rental_results: list[dict] | None) -> None:
def _store_pending_rental_selection(
self,
user_id: int | None,
rental_results: list[dict] | None,
search_payload: dict | None = None,
) -> None:
self._rental_flow_state_support.store_pending_rental_selection(
user_id=user_id,
rental_results=rental_results,
search_payload=search_payload,
)
# Recupera o ultimo snapshot de busca de locacao salvo no contexto.
def _get_last_rental_search_payload(self, user_id: int | None) -> dict:
if user_id is None:
return {}
pending_selection = self.state.get_entry("pending_rental_selections", user_id, expire=True)
if isinstance(pending_selection, dict):
pending_search_payload = self._sanitize_rental_search_payload(pending_selection.get("search_payload"))
if pending_search_payload:
return pending_search_payload
if not hasattr(self, "_get_user_context"):
return {}
context = self._get_user_context(user_id)
if not isinstance(context, dict):
return {}
return self._sanitize_rental_search_payload(context.get("last_rental_search_payload"))
# Persiste no contexto os campos reutilizaveis da busca de locacao.
def _store_last_rental_search_payload(self, user_id: int | None, payload) -> None:
if user_id is None or not hasattr(self, "_get_user_context") or not hasattr(self, "_save_user_context"):
return
context = self._get_user_context(user_id)
if not isinstance(context, dict):
return
sanitized = self._sanitize_rental_search_payload(payload)
if sanitized:
context["last_rental_search_payload"] = sanitized
else:
context.pop("last_rental_search_payload", None)
self._save_user_context(user_id=user_id, context=context)
# Le o veiculo de locacao escolhido que ficou salvo no contexto.
def _get_selected_rental_vehicle(self, user_id: int | None) -> dict | None:
return self._rental_flow_state_support.get_selected_rental_vehicle(user_id=user_id)
@ -51,6 +107,39 @@ class RentalFlowMixin:
def _sanitize_rental_contract_snapshot(self, payload) -> dict | None:
return self._rental_flow_state_support.sanitize_rental_contract_snapshot(payload)
# Filtra apenas os campos da busca que podem ser reaproveitados antes da escolha do veiculo.
def _sanitize_rental_search_payload(self, payload) -> dict:
if not isinstance(payload, dict):
return {}
sanitized: dict = {}
category = self._extract_rental_category_from_text(str(payload.get("categoria") or ""))
if category:
sanitized["categoria"] = category
plate = technical_normalizer.normalize_plate(payload.get("placa"))
if plate:
sanitized["placa"] = plate
cpf = technical_normalizer.normalize_cpf(payload.get("cpf"))
if cpf:
sanitized["cpf"] = cpf
model_hint = str(payload.get("modelo") or "").strip(" ,.;")
if model_hint and not self._extract_rental_category_from_text(model_hint):
sanitized["modelo"] = model_hint.title()
budget = technical_normalizer.normalize_positive_number(payload.get("valor_diaria_max"))
if budget is not None:
sanitized["valor_diaria_max"] = float(budget)
for field_name in ("data_inicio", "data_fim_prevista"):
normalized = self._normalize_rental_datetime_text(payload.get(field_name))
if normalized:
sanitized[field_name] = normalized
return sanitized
# Recupera o ultimo contrato de locacao lembrado para o usuario.
def _get_last_rental_contract(self, user_id: int | None) -> dict | None:
return self._rental_flow_state_support.get_last_rental_contract(user_id=user_id)
@ -97,7 +186,7 @@ class RentalFlowMixin:
# Extrai um modelo ou marca/modelo quando o pedido for mais especifico.
def _extract_rental_model_from_text(self, text: str) -> str | None:
normalized = self._normalize_text(text).strip()
normalized = self._normalize_rental_relative_text(text).strip()
if not normalized:
return None
@ -105,6 +194,13 @@ class RentalFlowMixin:
normalized = re.sub(r"\b\d{4}[/-]\d{1,2}[/-]\d{1,2}(?:\s+\d{1,2}:\d{2}(?::\d{2})?)?\b", " ", normalized)
normalized = re.sub(r"\b[a-z]{3}\d[a-z0-9]\d{2}\b", " ", normalized)
normalized = re.sub(r"\br\$\s*\d+[\d\.,]*\b", " ", normalized)
normalized = re.sub(
r"\b(?:depois\s+de\s+amanh(?:a)?|day\s+after\s+tomorrow|amanh(?:a)?|tomorrow|hoj(?:e)?|today)"
r"(?:\s+(?:as|a))?"
r"(?:\s+(?:\d{1,2}:\d{2}(?::\d{2})?|\d{1,2}\s*(?:h|hora|horas)))?\b",
" ",
normalized,
)
category = self._extract_rental_category_from_text(normalized)
if category:
@ -153,6 +249,8 @@ class RentalFlowMixin:
"barata",
"economico",
"economica",
"ate",
"at",
}
generic_tokens = {
"aluguel",
@ -200,6 +298,8 @@ class RentalFlowMixin:
continue
if re.fullmatch(r"(?:19|20)\d{2}", token):
continue
if re.fullmatch(r"\d{1,2}h", token):
continue
if len(token) < 2:
continue
tokens.append(token)
@ -213,19 +313,38 @@ class RentalFlowMixin:
# Coleta datas de locacao em texto livre mantendo a ordem encontrada.
def _extract_rental_datetimes_from_text(self, text: str) -> list[str]:
normalized = technical_normalizer.normalize_datetime_connector(text)
normalized = technical_normalizer.normalize_datetime_connector(
self._normalize_rental_relative_text(text)
)
patterns = (
r"\b\d{1,2}[/-]\d{1,2}[/-]\d{4}(?:\s+\d{1,2}:\d{2}(?::\d{2})?)?\b",
r"\b\d{4}[/-]\d{1,2}[/-]\d{1,2}(?:\s+\d{1,2}:\d{2}(?::\d{2})?)?\b",
)
results: list[str] = []
matches: list[tuple[int, str]] = []
for pattern in patterns:
for match in re.finditer(pattern, normalized):
candidate = self._normalize_rental_datetime_text(match.group(0))
if candidate and candidate not in results:
if candidate:
matches.append((match.start(), candidate))
relative_pattern = (
r"\b(?:depois\s+de\s+amanh(?:a)?|day\s+after\s+tomorrow|amanh(?:a)?|tomorrow|hoj(?:e)?|today)"
r"(?:\s+(?:as|a))?"
r"(?:\s+(?:\d{1,2}:\d{2}(?::\d{2})?|\d{1,2}\s*(?:h|hora|horas)))?"
)
for match in re.finditer(relative_pattern, normalized):
candidate = self._normalize_rental_datetime_text(match.group(0))
if candidate:
matches.append((match.start(), candidate))
results: list[str] = []
seen: set[str] = set()
for _, candidate in sorted(matches, key=lambda item: item[0]):
if candidate in seen:
continue
seen.add(candidate)
results.append(candidate)
return results
# Normaliza datas de locacao para um formato unico aceito pelo fluxo.
def _normalize_rental_datetime_text(self, value) -> str | None:
text = technical_normalizer.normalize_datetime_connector(str(value or "").strip())
@ -246,11 +365,28 @@ class RentalFlowMixin:
),
)
if parsed is None:
normalized = self._normalize_rental_relative_text(text)
day_offset = None
if "depois de amanha" in normalized or "depois de amanh" in normalized or "day after tomorrow" in normalized:
day_offset = 2
elif "amanha" in normalized or "amanh" in normalized or "tomorrow" in normalized:
day_offset = 1
elif "hoje" in normalized or "hoj" in normalized or "today" in normalized:
day_offset = 0
if day_offset is None:
return None
time_text = technical_normalizer.extract_hhmm_from_text(normalized)
if not time_text:
return None
hour_text, minute_text = time_text.split(":")
current_datetime = self._rental_now()
target_date = current_datetime + timedelta(days=day_offset)
return f"{target_date.strftime('%d/%m/%Y')} {int(hour_text):02d}:{int(minute_text):02d}"
if ":" in text:
return parsed.strftime("%d/%m/%Y %H:%M")
return parsed.strftime("%d/%m/%Y")
# Normaliza campos estruturados de aluguel antes de montar o draft.
def _normalize_rental_fields(self, data) -> dict:
if not isinstance(data, dict):
@ -467,6 +603,39 @@ class RentalFlowMixin:
lines.append("Pode responder com o numero da lista, com a placa ou com o modelo.")
return "\n".join(lines)
# Semeia o draft da locacao quando a frota e listada pelo caminho generico.
def _seed_pending_rental_draft_from_message(self, message: str, user_id: int | None) -> None:
if user_id is None or not self._has_explicit_rental_request(message):
return
draft = self.state.get_entry("pending_rental_drafts", user_id, expire=True)
if not isinstance(draft, dict):
draft = {
"payload": {},
"expires_at": utc_now() + timedelta(minutes=PENDING_RENTAL_DRAFT_TTL_MINUTES),
}
payload = draft.get("payload")
if not isinstance(payload, dict):
payload = {}
draft["payload"] = payload
self._try_capture_rental_fields_from_message(message=message, payload=payload)
if not payload:
return
draft["expires_at"] = utc_now() + timedelta(minutes=PENDING_RENTAL_DRAFT_TTL_MINUTES)
self.state.set_entry("pending_rental_drafts", user_id, draft)
self._mark_rental_flow_active(user_id=user_id, active_task="rental_create")
self._store_last_rental_search_payload(user_id=user_id, payload=payload)
rental_results = self._get_last_rental_results(user_id=user_id)
if rental_results:
self._store_pending_rental_selection(
user_id=user_id,
rental_results=rental_results,
search_payload=payload,
)
# Consulta a frota e guarda o resultado para a etapa de selecao.
async def _try_list_rental_fleet_for_selection(
self,
@ -511,6 +680,11 @@ class RentalFlowMixin:
rental_results = tool_result if isinstance(tool_result, list) else []
self._remember_rental_results(user_id=user_id, rental_results=rental_results)
self._store_pending_rental_selection(
user_id=user_id,
rental_results=rental_results,
search_payload=payload,
)
self._mark_rental_flow_active(user_id=user_id)
return self._fallback_format_tool_result("consultar_frota_aluguel", tool_result)
@ -547,6 +721,13 @@ class RentalFlowMixin:
):
return None
remembered_search_payload = self._get_last_rental_search_payload(user_id=user_id)
if draft is None and remembered_search_payload:
draft = {
"payload": dict(remembered_search_payload),
"expires_at": utc_now() + timedelta(minutes=PENDING_RENTAL_DRAFT_TTL_MINUTES),
}
if draft is None:
draft = {
"payload": {},
@ -577,6 +758,7 @@ class RentalFlowMixin:
draft["expires_at"] = utc_now() + timedelta(minutes=PENDING_RENTAL_DRAFT_TTL_MINUTES)
self.state.set_entry("pending_rental_drafts", user_id, draft)
self._mark_rental_flow_active(user_id=user_id, active_task="rental_create")
self._store_last_rental_search_payload(user_id=user_id, payload=draft_payload)
missing = [field for field in RENTAL_REQUIRED_FIELDS if field not in draft_payload]
if missing:
@ -626,4 +808,3 @@ class RentalFlowMixin:
user_id=user_id,
)
return self._fallback_format_tool_result("abrir_locacao_aluguel", tool_result)

@ -155,20 +155,35 @@ class RentalFlowStateSupport(FlowStateSupport):
rental_results = context.get("last_rental_results") or []
return self.sanitize_rental_results(rental_results if isinstance(rental_results, list) else [])
def store_pending_rental_selection(self, user_id: int | None, rental_results: list[dict] | None) -> None:
def store_pending_rental_selection(
self,
user_id: int | None,
rental_results: list[dict] | None,
search_payload: dict | None = None,
) -> None:
if user_id is None:
return
sanitized = self.sanitize_rental_results(rental_results)
if not sanitized:
self.pop_state_entry("pending_rental_selections", user_id)
return
entry = {
"payload": sanitized,
"expires_at": utc_now() + timedelta(minutes=PENDING_RENTAL_SELECTION_TTL_MINUTES),
}
current = self.get_state_entry("pending_rental_selections", user_id, expire=True)
current_search_payload = current.get("search_payload") if isinstance(current, dict) else None
candidate_search_payload = search_payload if isinstance(search_payload, dict) else current_search_payload
sanitized_search_payload = self.service._sanitize_rental_search_payload(candidate_search_payload)
if sanitized_search_payload:
entry["search_payload"] = sanitized_search_payload
self.set_state_entry(
"pending_rental_selections",
user_id,
{
"payload": sanitized,
"expires_at": utc_now() + timedelta(minutes=PENDING_RENTAL_SELECTION_TTL_MINUTES),
},
entry,
)
def get_selected_rental_vehicle(self, user_id: int | None) -> dict | None:

@ -111,6 +111,7 @@ class ReviewFlowMixin:
if not text:
return None
stop_terms = {
"depois de amanha",
"amanha",
"hoje",
"revisao",
@ -142,7 +143,7 @@ class ReviewFlowMixin:
)
if explicit_match:
raw_model = explicit_match.group(1)
raw_model = re.split(r"\b(?:ele e|ele eh|ano|placa|km|quilometragem|data|amanha|hoje)\b", raw_model, maxsplit=1)[0]
raw_model = re.split(r"\b(?:ele e|ele eh|ano|placa|km|quilometragem|data|depois de amanha|amanha|hoje)\b", raw_model, maxsplit=1)[0]
return self._clean_review_model_candidate(raw_model)
has_year = bool(re.search(r"(?<!\d)(19\d{2}|20\d{2}|2100)(?!\d)", normalized_message))
@ -166,7 +167,7 @@ class ReviewFlowMixin:
raw_model = re.split(r"(?<!\d)(?:19\d{2}|20\d{2}|2100)(?!\d)", raw_model, maxsplit=1)[0]
raw_model = re.split(r"(?<!\d)(?:\d{1,3}(?:[.\s]\d{3})+|\d{2,6})\s*km\b", raw_model, maxsplit=1, flags=re.IGNORECASE)[0]
raw_model = re.split(
r"\b(?:placa|quilometragem|data|amanha|hoje|nunca fiz revisao|nao fiz revisao|nunca revisei|ja fiz revisao|fiz revisao|ja revisei|na concessionaria|concessionaria)\b",
r"\b(?:placa|quilometragem|data|depois de amanha|amanha|hoje|nunca fiz revisao|nao fiz revisao|nunca revisei|ja fiz revisao|fiz revisao|ja revisei|na concessionaria|concessionaria)\b",
raw_model,
maxsplit=1,
)[0]
@ -235,6 +236,8 @@ class ReviewFlowMixin:
return None
if "hoje" in normalized_text:
return self._review_now().strftime("%d/%m/%Y")
if "depois de amanha" in normalized_text:
return (self._review_now() + timedelta(days=2)).strftime("%d/%m/%Y")
if "amanha" in normalized_text:
return (self._review_now() + timedelta(days=1)).strftime("%d/%m/%Y")
return None
@ -510,6 +513,15 @@ class ReviewFlowMixin:
)
except HTTPException as exc:
error = self.tool_executor.coerce_http_error(exc)
self._capture_review_confirmation_suggestion(
tool_name="editar_data_revisao",
arguments={
"protocolo": draft["payload"]["protocolo"],
"nova_data_hora": draft["payload"]["nova_data_hora"],
},
exc=exc,
user_id=user_id,
)
if error.get("retryable") and error.get("field"):
draft["payload"].pop(str(error["field"]), None)
draft["expires_at"] = utc_now() + timedelta(minutes=PENDING_REVIEW_DRAFT_TTL_MINUTES)

@ -338,7 +338,7 @@ class EntityNormalizer:
stripped = re.sub(r"\s*```$", "", stripped)
return stripped.strip()
def _try_parse_json_candidate(self, candidate: str):
def _try_parse_json_candidate(self, candidate: str, depth: int = 0):
normalized = str(candidate or "").strip()
if not normalized:
return None
@ -359,15 +359,32 @@ class EntityNormalizer:
for variant in variants:
try:
return json.loads(variant)
parsed = json.loads(variant)
except json.JSONDecodeError:
pass
parsed = None
coerced = self._coerce_parsed_json_candidate(parsed, depth=depth)
if isinstance(coerced, dict):
return coerced
try:
parsed = ast.literal_eval(variant)
except (ValueError, SyntaxError):
continue
coerced = self._coerce_parsed_json_candidate(parsed, depth=depth)
if isinstance(coerced, dict):
return coerced
return None
def _coerce_parsed_json_candidate(self, parsed, depth: int = 0):
if isinstance(parsed, dict):
return parsed
if depth >= 2:
return None
if isinstance(parsed, list) and len(parsed) == 1:
return self._coerce_parsed_json_candidate(parsed[0], depth=depth + 1)
if isinstance(parsed, str):
nested = parsed.strip()
if nested:
return self._try_parse_json_candidate(nested, depth=depth + 1)
return None
def coerce_turn_decision(self, payload) -> dict:
@ -431,7 +448,12 @@ class EntityNormalizer:
normalized["tool_name"] = tool_name or None
tool_arguments = normalized.get("tool_arguments")
if tool_name and isinstance(tool_arguments, dict):
normalized["tool_arguments"] = self.normalize_tool_arguments(tool_name, tool_arguments)
merged_tool_arguments = self._merge_tool_arguments_from_turn_entities(
tool_name=tool_name,
tool_arguments=tool_arguments,
entities=normalized.get("entities"),
)
normalized["tool_arguments"] = self.normalize_tool_arguments(tool_name, merged_tool_arguments)
else:
normalized["tool_arguments"] = tool_arguments if isinstance(tool_arguments, dict) else {}
@ -707,6 +729,28 @@ class EntityNormalizer:
return payload
def _merge_tool_arguments_from_turn_entities(self, tool_name: str | None, tool_arguments: dict, entities: dict | None) -> dict:
merged_arguments = dict(tool_arguments or {})
normalized_tool_name = self.normalize_tool_name(tool_name)
if normalized_tool_name != "consultar_estoque":
return merged_arguments
generic_memory = (entities or {}).get("generic_memory") if isinstance(entities, dict) else {}
if not isinstance(generic_memory, dict):
return merged_arguments
if merged_arguments.get("preco_max") in (None, "", [], {}):
budget = generic_memory.get("orcamento_max")
if budget not in (None, "", [], {}):
merged_arguments["preco_max"] = budget
if merged_arguments.get("categoria") in (None, "", [], {}):
profiles = self.normalize_vehicle_profile(generic_memory.get("perfil_veiculo"))
if len(profiles) == 1:
merged_arguments["categoria"] = profiles[0]
return merged_arguments
def _normalize_turn_missing_fields(self, missing_fields) -> list[str]:
if missing_fields is None:
return []

@ -59,6 +59,135 @@ class MessagePlanner:
logger.exception("Falha ao extrair plano da mensagem com LLM. user_id=%s", user_id)
return default
async def extract_turn_bundle(self, message: str, user_id: int | None) -> dict:
user_context = f"user_id={user_id}" if user_id is not None else "user_id=anonimo"
default_turn_decision = self.normalizer.empty_turn_decision()
default_message_plan = self.normalizer.empty_message_plan(message=message)
compact_turn_entities = {
"generic_memory": {},
"review_fields": {},
"review_management_fields": {},
"order_fields": {},
"cancel_order_fields": {},
}
compact_order_entities = {
**compact_turn_entities,
"intents": {},
}
schema_example = json.dumps(
{
"turn_decision": {
"intent": "general",
"domain": "general",
"action": "answer_user",
"entities": compact_turn_entities,
"missing_fields": [],
"selection_index": None,
"tool_name": None,
"tool_arguments": {},
"response_to_user": None,
},
"message_plan": {
"orders": [
{
"domain": "general",
"message": "trecho literal do pedido",
"entities": compact_order_entities,
}
]
},
},
ensure_ascii=True,
)
prompt = (
"Analise a mensagem do usuario e retorne APENAS JSON valido com duas secoes: turn_decision e message_plan.\n"
"Sem markdown, sem texto fora do JSON, sem inventar dados ausentes.\n\n"
"Contrato:\n"
f"{schema_example}\n\n"
"Regras:\n"
"- turn_decision resume a intencao principal do turno; domain deve ser review, sales ou general.\n"
"- message_plan.orders separa pedidos operacionais em ordem; se nao houver pedido operacional, use um unico item general com a mensagem inteira.\n"
"- Cada order deve ter domain, message e entities; mantenha message curta e fiel ao texto do usuario.\n"
"- Preencha apenas dados claros. Use entities.generic_memory.orcamento_max para teto/faixa de preco e perfil_veiculo para suv/sedan/hatch/pickup.\n"
"- Se faltar dado para continuar um fluxo, use action=ask_missing_fields e preencha missing_fields e response_to_user. Se nao houver acao operacional, use action=answer_user.\n"
"- Compra efetiva: intent=order_create, domain=sales, prefira tool_name=realizar_pedido.\n"
"- Listar pedidos: intent=order_list, domain=sales, action=call_tool, tool_name=listar_pedidos.\n"
"- Consultar/listar/buscar/ver estoque para compra: intent=inventory_search, domain=sales, action=call_tool, tool_name=consultar_estoque; tool_arguments so com filtros explicitamente pedidos, como preco_max, categoria e opcionalmente limite.\n"
"- Listar revisoes: intent=review_list, domain=review, action=call_tool, tool_name=listar_agendamentos_revisao.\n"
"- Cancelar revisao: intent=review_cancel, domain=review, prefira tool_name=cancelar_agendamento_revisao.\n"
"- Remarcar revisao: intent=review_reschedule, domain=review, prefira tool_name=editar_data_revisao.\n"
"- Avaliar troca com modelo, ano e km: domain=sales, action=call_tool, tool_name=avaliar_veiculo_troca e informe esses campos em tool_arguments.\n\n"
f"Contexto: {user_context}\n"
f"Mensagem do usuario: {message}"
)
preferred_models = getattr(self.llm, "bundle_model_names", None)
bundle_generation_config = {
"candidate_count": 1,
"temperature": 0,
"max_output_tokens": 768,
}
for attempt in range(2):
try:
result = await self.llm.generate_response(
message=prompt,
tools=[],
preferred_models=preferred_models if attempt == 0 else None,
generation_config=bundle_generation_config,
)
text = (result.get("response") or "").strip()
payload = self.normalizer.parse_json_object(text)
if not isinstance(payload, dict):
if attempt == 0:
logger.warning("Bundle estruturado invalido (nao JSON objeto); repetindo uma vez. user_id=%s", user_id)
continue
raw_turn_decision = payload.get("turn_decision")
raw_message_plan = payload.get("message_plan")
has_turn_decision = isinstance(raw_turn_decision, dict) and any(
key in raw_turn_decision
for key in (
"intent",
"domain",
"action",
"entities",
"tool_name",
"tool_arguments",
"response_to_user",
"missing_fields",
"selection_index",
)
)
raw_orders = raw_message_plan.get("orders") if isinstance(raw_message_plan, dict) else None
has_message_plan = isinstance(raw_orders, list) and len(raw_orders) > 0
bundle = {
"turn_decision": self.normalizer.coerce_turn_decision(raw_turn_decision),
"message_plan": self.normalizer.coerce_message_plan(raw_message_plan, message=message),
"has_turn_decision": has_turn_decision,
"has_message_plan": has_message_plan,
}
if has_turn_decision and has_message_plan:
return bundle
if has_turn_decision or has_message_plan:
return bundle
if attempt == 0:
logger.warning(
"Bundle estruturado incompleto; repetindo uma vez. user_id=%s has_turn_decision=%s has_message_plan=%s",
user_id,
has_turn_decision,
has_message_plan,
)
except Exception:
logger.exception("Falha ao extrair bundle estruturado com LLM. user_id=%s", user_id)
break
return {
"turn_decision": default_turn_decision,
"message_plan": default_message_plan,
"has_turn_decision": False,
"has_message_plan": False,
}
async def extract_routing(self, message: str, user_id: int | None) -> dict:
plan = await self.extract_message_plan(message=message, user_id=user_id)
return {
@ -70,7 +199,6 @@ class MessagePlanner:
for item in plan.get("orders", [])
]
}
async def extract_entities(self, message: str, user_id: int | None) -> dict:
user_context = f"user_id={user_id}" if user_id is not None else "user_id=anonimo"
prompt = (
@ -193,6 +321,8 @@ class MessagePlanner:
"- Em pedidos com tipo de carro (ex.: suv, sedan, hatch, pickup), preencha entities.generic_memory.perfil_veiculo.\n"
"- Se o usuario quiser efetivar a compra de um veiculo, use intent='order_create', domain='sales' e prefira tool_name='realizar_pedido'.\n"
"- Se o usuario quiser listar os pedidos dele, use intent='order_list', domain='sales', action='call_tool' e tool_name='listar_pedidos'.\n"
"- Se o usuario quiser consultar, listar, buscar ou ver veiculos/estoque para compra, use intent='inventory_search', domain='sales', action='call_tool' e tool_name='consultar_estoque'.\n"
"- Em consultar_estoque, preencha tool_arguments apenas com filtros claramente expressos pelo usuario, como preco_max, categoria e opcionalmente limite.\n"
"- Se o usuario quiser listar agendamentos de revisao, use intent='review_list', domain='review', action='call_tool' e tool_name='listar_agendamentos_revisao'.\n"
"- Se o usuario quiser cancelar um agendamento de revisao, use intent='review_cancel', domain='review' e prefira tool_name='cancelar_agendamento_revisao'.\n"
"- Se o usuario quiser remarcar um agendamento de revisao, use intent='review_reschedule', domain='review' e prefira tool_name='editar_data_revisao'.\n"
@ -251,3 +381,5 @@ class MessagePlanner:
"cancel_order_fields": self.normalizer.normalize_cancel_order_fields(coerced.get("cancel_order_fields")),
"intents": self.normalizer.normalize_intents(coerced.get("intents")),
}

@ -227,6 +227,7 @@ class OrchestratorContextManager:
if isinstance(context, dict):
context["last_rental_results"] = []
context["selected_rental_vehicle"] = None
context.pop("last_rental_search_payload", None)
if context.get("active_task") == "rental_create":
context["active_task"] = None
if str(context.get("active_domain") or "").strip().lower() == "rental":

@ -1,4 +1,4 @@
import json
import json
import logging
import re
from datetime import datetime, timedelta
@ -47,6 +47,7 @@ from app.services.flows.review_flow import ReviewFlowMixin
from app.services.orchestration.tool_executor import ToolExecutor
from app.services.tools.tool_registry import ToolRegistry
from app.services.orchestration.response_formatter import format_currency_br, format_datetime_for_chat
from app.services.orchestration.technical_normalizer import extract_budget_from_text, normalize_vehicle_profile
logger = logging.getLogger(__name__)
@ -220,12 +221,36 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin, RentalFlowMixin):
)
if current_rental_info:
return current_rental_info
deterministic_rental_bootstrap = await self._try_handle_deterministic_rental_bootstrap(
message=message,
user_id=user_id,
finish=finish,
)
if deterministic_rental_bootstrap:
return deterministic_rental_bootstrap
# Faz uma leitura inicial do turno para ajudar a policy
# com fila, troca de contexto e comandos globais.
early_turn_decision = await self._extract_turn_decision_with_llm(
message=message,
user_id=user_id,
)
use_turn_bundle = self._should_attempt_turn_bundle(
message=message,
early_turn_decision=early_turn_decision,
)
turn_bundle = (
await self._extract_turn_bundle_with_llm(
message=message,
user_id=user_id,
)
if use_turn_bundle
else None
)
bundle_has_message_plan = (
isinstance(turn_bundle, dict)
and bool(turn_bundle.get("has_message_plan"))
and isinstance(turn_bundle.get("message_plan"), dict)
)
reset_override = await self._try_handle_immediate_context_reset(
message=message,
user_id=user_id,
@ -259,10 +284,28 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin, RentalFlowMixin):
return deterministic_rental_management
message_plan = await self._extract_message_plan_with_llm(
synthesized_message_plan = (
self._synthesize_message_plan_from_turn_decision(
message=message,
turn_decision=early_turn_decision,
)
if not bundle_has_message_plan
and self._can_synthesize_message_plan_from_turn_decision(
message=message,
turn_decision=early_turn_decision,
)
else None
)
message_plan = (
turn_bundle.get("message_plan")
if bundle_has_message_plan
else synthesized_message_plan
if isinstance(synthesized_message_plan, dict)
else await self._extract_message_plan_with_llm(
message=message,
user_id=user_id,
)
)
routing_plan = {
"orders": [
{
@ -299,6 +342,13 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin, RentalFlowMixin):
user_id=user_id,
)
self._capture_turn_decision_trace(turn_decision)
decision_entities = self._extracted_entities_from_turn_decision(turn_decision)
if self._has_useful_extraction(decision_entities):
extracted_entities = self._merge_extracted_entities(
extracted_entities,
decision_entities,
)
if not self._has_useful_extraction(extracted_entities):
llm_extracted_entities = await self._extract_entities_with_llm(
message=routing_message,
user_id=user_id,
@ -307,10 +357,18 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin, RentalFlowMixin):
extracted_entities,
llm_extracted_entities,
)
if self._has_useful_turn_decision(turn_decision):
extracted_entities = self._merge_extracted_entities(
extracted_entities,
self._extracted_entities_from_turn_decision(turn_decision),
else:
started_at = perf_counter()
self._emit_turn_stage_metric(
"extract_entities_short_circuit",
started_at,
has_message_plan_entities=self._has_useful_extraction(
self._resolve_entities_for_message_plan(
message_plan=message_plan,
routed_message=routing_message,
)
),
has_turn_decision_entities=self._has_useful_extraction(decision_entities),
)
self._capture_generic_memory(
user_id=user_id,
@ -383,6 +441,7 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin, RentalFlowMixin):
self._update_active_domain(user_id=user_id, domain_hint=domain_hint)
reusable_router_result = None
orchestration_override = await self._try_execute_orchestration_control_tool(
message=routing_message,
user_id=user_id,
@ -391,7 +450,12 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin, RentalFlowMixin):
queue_notice=queue_notice,
finish=finish,
)
if orchestration_override:
if isinstance(orchestration_override, dict):
reusable_router_result = orchestration_override.get("llm_result")
handled_response = orchestration_override.get("handled_response")
if handled_response:
return handled_response
elif orchestration_override:
return orchestration_override
trade_in_response = await self._try_handle_trade_in_evaluation(
@ -505,6 +569,8 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin, RentalFlowMixin):
tools = self.registry.get_tools()
llm_result = reusable_router_result
if not isinstance(llm_result, dict):
llm_result = await self._call_llm_with_trace(
operation="router",
message=self._build_router_prompt(user_message=routing_message, user_id=user_id),
@ -560,6 +626,11 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin, RentalFlowMixin):
tool_result=tool_result,
user_id=user_id,
)
if tool_name == "consultar_frota_aluguel":
self._seed_pending_rental_draft_from_message(
message=routing_message,
user_id=user_id,
)
if self._should_use_deterministic_response(tool_name):
return await finish(
@ -615,7 +686,7 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin, RentalFlowMixin):
extracted_entities: dict,
queue_notice: str | None,
finish,
) -> str | None:
) -> str | dict | None:
decision = turn_decision or {}
decision_action = str(decision.get("action") or "").strip()
decision_tool_name = str(decision.get("tool_name") or "").strip()
@ -647,6 +718,9 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin, RentalFlowMixin):
queue_notice=queue_notice,
)
if self._should_skip_orchestration_control_router(turn_decision=decision):
return None
tools = self.registry.get_tools()
llm_result = await self._call_llm_with_trace(
operation="orchestration_router",
@ -687,8 +761,12 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin, RentalFlowMixin):
)
and self._is_low_value_response(first_pass_text)
)
reusable_first_pass = self._build_reusable_router_result_payload(
llm_result=llm_result,
source="orchestration_router",
)
if not should_force_tool:
return None
return reusable_first_pass
llm_result = await self._call_llm_with_trace(
operation="orchestration_force_tool",
@ -698,7 +776,10 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin, RentalFlowMixin):
forced_tool_call = llm_result.get("tool_call") or {}
forced_tool_name = forced_tool_call.get("name")
if forced_tool_name not in ORCHESTRATION_CONTROL_TOOLS:
return None
return self._build_reusable_router_result_payload(
llm_result=llm_result,
source="orchestration_force_tool",
)
if (
forced_tool_name == "cancelar_fluxo_atual"
and self.policy.should_defer_flow_cancellation_control(message=message, user_id=user_id)
@ -1406,6 +1487,48 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin, RentalFlowMixin):
queue_notice=queue_notice,
)
async def _try_handle_deterministic_rental_bootstrap(
self,
message: str,
user_id: int | None,
finish,
) -> str | None:
if user_id is None:
return None
if (
self._has_rental_return_management_request(message, user_id=user_id)
or self._has_rental_payment_or_fine_request(message)
):
return None
if (
self._has_explicit_order_request(message)
or self._has_stock_listing_request(message)
or self._has_order_listing_request(message)
or self._has_trade_in_evaluation_request(message)
):
return None
explicit_rental_request = self._has_explicit_rental_request(message)
rental_listing_request = self._has_rental_listing_request(message)
if not explicit_rental_request and not rental_listing_request:
return None
turn_decision = {
"intent": "rental_create" if explicit_rental_request else "rental_list",
"domain": "rental",
"action": "collect_rental_create" if explicit_rental_request else "collect_rental_list",
}
response = await self._try_collect_and_open_rental(
message=message,
user_id=user_id,
extracted_fields={},
intents={},
turn_decision=turn_decision,
)
if not response:
return None
return await finish(response)
async def _try_handle_active_sales_follow_up(
self,
message: str,
@ -1612,6 +1735,11 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin, RentalFlowMixin):
tool_result=tool_result,
user_id=user_id,
)
if tool_name == "consultar_frota_aluguel":
self._seed_pending_rental_draft_from_message(
message=message,
user_id=user_id,
)
if self._should_use_deterministic_response(tool_name):
return await finish(
@ -1686,7 +1814,9 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin, RentalFlowMixin):
# Nessa funcao eu configuro a memoria volatil do sistema
def _upsert_user_context(self, user_id: int | None) -> None:
started_at = perf_counter()
self._context_manager.upsert_user_context(user_id=user_id)
self._emit_turn_stage_metric("upsert_user_context", started_at)
def _get_user_context(self, user_id: int | None) -> dict | None:
return self._context_manager.get_user_context(user_id)
@ -1738,6 +1868,7 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin, RentalFlowMixin):
user = self._get_user_record(user_id=user_id)
if not user or not getattr(user, "email", None):
return
started_at = perf_counter()
try:
sync_user_email_integration_routes(
user_id=user.id,
@ -1745,6 +1876,11 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin, RentalFlowMixin):
recipient_name=user.name,
)
self._user_profile_routes_ready = True
self._emit_turn_stage_metric(
"ensure_user_email_routes",
started_at,
synced_routes_count=6,
)
except Exception:
logger.exception(
"Falha ao sincronizar rotas de email do usuario.",
@ -1952,17 +2088,104 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin, RentalFlowMixin):
def _coerce_extraction_contract(self, payload) -> dict:
return self.normalizer.coerce_extraction_contract(payload)
async def _extract_turn_bundle_with_llm(self, message: str, user_id: int | None) -> dict | None:
planner = getattr(self, "planner", None)
if planner is None or not hasattr(planner, "extract_turn_bundle"):
return None
started_at = perf_counter()
result = await planner.extract_turn_bundle(message=message, user_id=user_id)
self._emit_turn_stage_metric(
"extract_turn_bundle",
started_at,
has_turn_decision=bool((result or {}).get("has_turn_decision")),
has_message_plan=bool((result or {}).get("has_message_plan")),
order_count=len((((result.get("message_plan") if isinstance(result, dict) else {}) or {}).get("orders") or [])),
)
return result if isinstance(result, dict) else None
async def _extract_message_plan_with_llm(self, message: str, user_id: int | None) -> dict:
return await self.planner.extract_message_plan(message=message, user_id=user_id)
started_at = perf_counter()
result = await self.planner.extract_message_plan(message=message, user_id=user_id)
self._emit_turn_stage_metric(
"extract_message_plan",
started_at,
order_count=len(result.get("orders") or []) if isinstance(result, dict) else 0,
)
return result
async def _extract_routing_with_llm(self, message: str, user_id: int | None) -> dict:
return await self.planner.extract_routing(message=message, user_id=user_id)
started_at = perf_counter()
result = await self.planner.extract_routing(message=message, user_id=user_id)
self._emit_turn_stage_metric(
"extract_routing",
started_at,
order_count=len(result.get("orders") or []) if isinstance(result, dict) else 0,
)
return result
async def _extract_entities_with_llm(self, message: str, user_id: int | None) -> dict:
return await self.planner.extract_entities(message=message, user_id=user_id)
started_at = perf_counter()
result = await self.planner.extract_entities(message=message, user_id=user_id)
self._emit_turn_stage_metric(
"extract_entities",
started_at,
has_generic_memory=bool((result or {}).get("generic_memory")),
review_field_keys=[
key
for key, value in ((result or {}).get("review_fields") or {}).items()
if value not in (None, "", [], {})
],
)
return result
async def _extract_sales_search_context_with_llm(self, message: str, user_id: int | None) -> dict:
return await self.planner.extract_sales_search_context(message=message, user_id=user_id)
started_at = perf_counter()
result = await self.planner.extract_sales_search_context(message=message, user_id=user_id)
self._emit_turn_stage_metric(
"extract_sales_search_context",
started_at,
has_budget=bool((result or {}).get("orcamento_max")),
profile_count=len((result or {}).get("perfil_veiculo") or []),
)
return result
def _extract_sales_search_context_deterministically(self, message: str) -> dict:
started_at = perf_counter()
candidate = str(message or "").strip()
if not candidate:
return {}
extracted: dict[str, object] = {}
budget = extract_budget_from_text(candidate)
if budget:
extracted["orcamento_max"] = int(round(budget))
normalized_message = self._normalize_text(candidate)
raw_profiles: list[str] = []
for pattern, canonical in (
(r"\bsuv\b", "suv"),
(r"\bsedan\b", "sedan"),
(r"\bhatch\b", "hatch"),
(r"\bpickup\b", "pickup"),
(r"\bpicape\b", "pickup"),
):
if canonical in raw_profiles:
continue
if re.search(pattern, normalized_message):
raw_profiles.append(canonical)
profile = normalize_vehicle_profile(raw_profiles)
if profile:
extracted["perfil_veiculo"] = profile
if extracted:
self._emit_turn_stage_metric(
"extract_sales_search_context_short_circuit",
started_at,
source="technical",
has_budget=bool(extracted.get("orcamento_max")),
profile_count=len(extracted.get("perfil_veiculo") or []),
)
return extracted
async def _extract_missing_sales_search_context_with_llm(
self,
@ -1974,7 +2197,9 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin, RentalFlowMixin):
decision = turn_decision or {}
decision_intent = str(decision.get("intent") or "").strip().lower()
decision_domain = str(decision.get("domain") or "").strip().lower()
if decision_domain != "sales" and decision_intent not in {"order_create", "order_list", "inventory_search"}:
if decision_intent not in {"order_create", "inventory_search"} and decision_domain != "sales":
return {}
if decision_intent not in {"order_create", "inventory_search"}:
return {}
generic_memory = (extracted_entities or {}).get("generic_memory")
@ -1982,10 +2207,20 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin, RentalFlowMixin):
generic_memory = {}
if generic_memory.get("orcamento_max") or generic_memory.get("perfil_veiculo"):
return {}
return await self._extract_sales_search_context_with_llm(message=message, user_id=user_id)
return self._extract_sales_search_context_deterministically(message)
async def _extract_turn_decision_with_llm(self, message: str, user_id: int | None) -> dict:
return await self.planner.extract_turn_decision(message=message, user_id=user_id)
started_at = perf_counter()
result = await self.planner.extract_turn_decision(message=message, user_id=user_id)
self._emit_turn_stage_metric(
"extract_turn_decision",
started_at,
intent=str((result or {}).get("intent") or ""),
action=str((result or {}).get("action") or ""),
domain=str((result or {}).get("domain") or ""),
)
return result
async def _try_handle_immediate_context_reset(
self,
@ -2042,6 +2277,14 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin, RentalFlowMixin):
def _has_useful_turn_decision(self, turn_decision: dict | None) -> bool:
if not isinstance(turn_decision, dict):
return False
if str(turn_decision.get("response_to_user") or "").strip():
return True
if turn_decision.get("selection_index") is not None:
return True
if str(turn_decision.get("tool_name") or "").strip():
return True
if turn_decision.get("missing_fields"):
return True
if (turn_decision.get("intent") or "general") != "general":
return True
if (turn_decision.get("action") or "answer_user") != "answer_user":
@ -2049,11 +2292,140 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin, RentalFlowMixin):
entities = turn_decision.get("entities")
return self._has_useful_extraction(self._extracted_entities_from_turn_decision(turn_decision)) if isinstance(entities, dict) else False
def _should_attempt_turn_bundle(self, message: str, early_turn_decision: dict | None) -> bool:
# O bundle ficou caro e instavel nas amostras atuais.
# Mantemos o caminho desabilitado por padrao e deixamos opt-in
# para cenarios/testes onde ainda queremos exercita-lo.
return False
def _can_synthesize_message_plan_from_turn_decision(self, message: str, turn_decision: dict | None) -> bool:
if not str(message or "").strip():
return False
if str((turn_decision or {}).get("action") or "").strip().lower() != "call_tool":
return False
normalized_tool_name = self.normalizer.normalize_tool_name((turn_decision or {}).get("tool_name"))
if normalized_tool_name not in {"consultar_estoque", "avaliar_veiculo_troca"}:
return False
if self._has_message_plan_synthesis_conflict(
message=message,
turn_decision=turn_decision,
normalized_tool_name=normalized_tool_name,
):
return False
if normalized_tool_name == "consultar_estoque":
return self._has_stock_listing_request(message, turn_decision=turn_decision)
return self._has_trade_in_evaluation_request(message, turn_decision=turn_decision)
def _has_message_plan_synthesis_conflict(
self,
message: str,
turn_decision: dict | None,
normalized_tool_name: str,
) -> bool:
normalized_message = self._normalize_text(message).strip()
if not normalized_message:
return False
seed_order = {
"domain": "sales",
"message": str(message or "").strip(),
"entities": self._empty_extraction_payload(),
}
augmented_orders = [seed_order]
if hasattr(self, "policy") and self.policy is not None:
augmented_orders = self.policy.augment_actionable_orders_from_message(
message=message,
extracted_orders=[seed_order],
)
actionable_domains = {
str(order.get("domain") or "general")
for order in augmented_orders
if isinstance(order, dict)
}
if len(actionable_domains & {"sales", "review", "rental"}) > 1:
return True
if self._has_order_listing_request(message=message, turn_decision=turn_decision):
return True
if normalized_tool_name == "consultar_estoque":
return self._has_trade_in_evaluation_request(message, turn_decision=turn_decision)
return (
self._has_stock_listing_request(message=message, turn_decision=turn_decision)
or self._has_explicit_order_request(message)
)
def _synthesize_message_plan_from_turn_decision(self, message: str, turn_decision: dict | None) -> dict:
domain = self._domain_from_turn_decision(turn_decision)
normalized_tool_name = self.normalizer.normalize_tool_name((turn_decision or {}).get("tool_name"))
if domain == "general" and normalized_tool_name in {"consultar_estoque", "avaliar_veiculo_troca"}:
domain = "sales"
extracted_entities = self._merge_extracted_entities(
self._empty_extraction_payload(),
self._extracted_entities_from_turn_decision(turn_decision),
)
return {
"orders": [
{
"domain": domain,
"message": str(message or "").strip(),
"entities": extracted_entities,
}
]
}
def _build_reusable_router_result_payload(self, llm_result: dict | None, source: str) -> dict | None:
if not isinstance(llm_result, dict):
return None
tool_call = llm_result.get("tool_call") or {}
tool_name = str(tool_call.get("name") or "").strip()
if tool_name and tool_name not in ORCHESTRATION_CONTROL_TOOLS:
return {"llm_result": llm_result, "source": source}
response_text = str(llm_result.get("response") or "").strip()
if response_text and not self._is_low_value_response(response_text):
return {"llm_result": llm_result, "source": source}
return None
def _should_skip_orchestration_control_router(self, turn_decision: dict | None) -> bool:
decision = turn_decision or {}
decision_action = str(decision.get("action") or "").strip().lower()
decision_intent = str(decision.get("intent") or "").strip().lower()
decision_domain = str(decision.get("domain") or "").strip().lower()
normalized_tool_name = self.normalizer.normalize_tool_name(decision.get("tool_name"))
if normalized_tool_name in ORCHESTRATION_CONTROL_TOOLS:
return False
if decision_action in {"clear_context", "continue_queue", "discard_queue", "cancel_active_flow"}:
return False
if decision_action == "call_tool" and normalized_tool_name:
return True
if decision_intent in {
"order_create",
"inventory_search",
"order_list",
"order_cancel",
"review_schedule",
"review_list",
"review_cancel",
"review_reschedule",
}:
return True
return decision_domain in {"sales", "review"} and decision_action in {
"ask_missing_fields",
"collect_review_schedule",
"collect_review_management",
"collect_order_create",
"collect_order_cancel",
}
def _extracted_entities_from_turn_decision(self, turn_decision: dict | None) -> dict:
entities = (turn_decision or {}).get("entities")
if not isinstance(entities, dict):
entities = {}
return {
extracted = {
"generic_memory": entities.get("generic_memory", {}),
"review_fields": entities.get("review_fields", {}),
"review_management_fields": entities.get("review_management_fields", {}),
@ -2062,6 +2434,43 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin, RentalFlowMixin):
"intents": {},
}
normalized_tool_name = self.normalizer.normalize_tool_name((turn_decision or {}).get("tool_name"))
raw_tool_arguments = (turn_decision or {}).get("tool_arguments")
if normalized_tool_name == "avaliar_veiculo_troca" and isinstance(raw_tool_arguments, dict):
normalized_arguments = self.normalizer.normalize_tool_arguments(
"avaliar_veiculo_troca",
raw_tool_arguments,
)
if normalized_arguments:
review_fields = extracted.get("review_fields")
if not isinstance(review_fields, dict):
review_fields = {}
for field in ("modelo", "ano", "km"):
value = normalized_arguments.get(field)
if value not in (None, "", [], {}):
review_fields[field] = value
extracted["review_fields"] = review_fields
if normalized_tool_name == "consultar_estoque" and isinstance(raw_tool_arguments, dict):
normalized_arguments = self.normalizer.normalize_tool_arguments(
"consultar_estoque",
raw_tool_arguments,
)
if normalized_arguments:
generic_memory = extracted.get("generic_memory")
if not isinstance(generic_memory, dict):
generic_memory = {}
budget = normalized_arguments.get("preco_max")
if budget not in (None, "", [], {}):
generic_memory["orcamento_max"] = int(round(float(budget)))
category = str(normalized_arguments.get("categoria") or "").strip().lower()
if category:
existing_profiles = normalize_vehicle_profile(generic_memory.get("perfil_veiculo"))
generic_memory["perfil_veiculo"] = normalize_vehicle_profile([*existing_profiles, category])
extracted["generic_memory"] = generic_memory
return extracted
def _merge_extracted_entities(self, base: dict | None, override: dict | None) -> dict:
merged = self._empty_extraction_payload()
for section in ("generic_memory", "review_fields", "review_management_fields", "order_fields", "cancel_order_fields"):
@ -2643,17 +3052,20 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin, RentalFlowMixin):
exc: HTTPException,
user_id: int | None,
) -> None:
if tool_name != "agendar_revisao" or user_id is None or exc.status_code != 409:
if tool_name not in {"agendar_revisao", "editar_data_revisao"} or user_id is None or exc.status_code != 409:
return
detail = exc.detail if isinstance(exc.detail, dict) else {}
suggested_iso = str(detail.get("suggested_iso") or "").strip()
if not suggested_iso:
return
payload = dict(arguments or {})
if not payload.get("placa"):
datetime_field = "nova_data_hora" if tool_name == "editar_data_revisao" else "data_hora"
required_field = "protocolo" if tool_name == "editar_data_revisao" else "placa"
if not payload.get(required_field):
return
payload["data_hora"] = suggested_iso
payload[datetime_field] = suggested_iso
self.state.set_entry("pending_review_confirmations", user_id, {
"tool_name": tool_name,
"payload": payload,
"expires_at": utc_now() + timedelta(minutes=PENDING_REVIEW_TTL_MINUTES),
})
@ -2773,6 +3185,14 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin, RentalFlowMixin):
def _format_turn_error(self, exc: Exception) -> str:
return self._execution_manager.format_turn_error(exc)
def _emit_turn_stage_metric(self, operation: str, started_at: float, **payload) -> None:
self._log_turn_event(
"turn_stage_completed",
operation=operation,
elapsed_ms=round((perf_counter() - started_at) * 1000, 2),
**payload,
)
def _log_turn_event(self, event: str, **payload) -> None:
self._execution_manager.log_turn_event(event, **payload)
@ -2870,17 +3290,20 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin, RentalFlowMixin):
exc: HTTPException,
user_id: int | None,
) -> None:
if tool_name != "agendar_revisao" or user_id is None or exc.status_code != 409:
if tool_name not in {"agendar_revisao", "editar_data_revisao"} or user_id is None or exc.status_code != 409:
return
detail = exc.detail if isinstance(exc.detail, dict) else {}
suggested_iso = str(detail.get("suggested_iso") or "").strip()
if not suggested_iso:
return
payload = dict(arguments or {})
if not payload.get("placa"):
datetime_field = "nova_data_hora" if tool_name == "editar_data_revisao" else "data_hora"
required_field = "protocolo" if tool_name == "editar_data_revisao" else "placa"
if not payload.get(required_field):
return
payload["data_hora"] = suggested_iso
payload[datetime_field] = suggested_iso
self.state.set_entry("pending_review_confirmations", user_id, {
"tool_name": tool_name,
"payload": payload,
"expires_at": utc_now() + timedelta(minutes=PENDING_REVIEW_TTL_MINUTES),
})
@ -2897,28 +3320,39 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin, RentalFlowMixin):
if not pending:
return None
pending_tool_name = str(pending.get("tool_name") or "agendar_revisao").strip() or "agendar_revisao"
datetime_field = "nova_data_hora" if pending_tool_name == "editar_data_revisao" else "data_hora"
normalized_schedule_fields = self._normalize_review_fields(extracted_review_fields)
normalized_management_fields = self._normalize_review_management_fields(extracted_review_fields)
normalized_message_datetime = None if self._is_affirmative_message(message) else self._normalize_review_datetime_text(message)
time_only = self._extract_time_only(message)
if self._is_negative_message(message) or time_only:
extracted = self._normalize_review_fields(extracted_review_fields)
new_data_hora = extracted.get("data_hora")
new_data_hora = (
normalized_management_fields.get("nova_data_hora")
if pending_tool_name == "editar_data_revisao"
else normalized_schedule_fields.get("data_hora")
)
if not new_data_hora and normalized_message_datetime:
new_data_hora = normalized_message_datetime
if not new_data_hora and time_only:
new_data_hora = self._merge_date_with_time(pending["payload"].get("data_hora", ""), time_only)
new_data_hora = self._merge_date_with_time(pending["payload"].get(datetime_field, ""), time_only)
if self._is_negative_message(message) or time_only or (new_data_hora and not self._is_affirmative_message(message)):
if not new_data_hora:
self.state.pop_entry("pending_review_confirmations", user_id)
return "Sem problema. Me informe a nova data e hora desejada para a revisao."
payload = dict(pending["payload"])
payload["data_hora"] = new_data_hora
payload[datetime_field] = new_data_hora
try:
tool_result = await self.tool_executor.execute(
"agendar_revisao",
pending_tool_name,
payload,
user_id=user_id,
)
except HTTPException as exc:
self.state.pop_entry("pending_review_confirmations", user_id)
self._capture_review_confirmation_suggestion(
tool_name="agendar_revisao",
tool_name=pending_tool_name,
arguments=payload,
exc=exc,
user_id=user_id,
@ -2926,24 +3360,32 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin, RentalFlowMixin):
return self._http_exception_detail(exc)
self._reset_pending_review_states(user_id=user_id)
if pending_tool_name == "agendar_revisao":
self._store_last_review_package(user_id=user_id, payload=payload)
return self._fallback_format_tool_result("agendar_revisao", tool_result)
return self._fallback_format_tool_result(pending_tool_name, tool_result)
if not self._is_affirmative_message(message):
return None
try:
tool_result = await self.tool_executor.execute(
"agendar_revisao",
pending_tool_name,
pending["payload"],
user_id=user_id,
)
except HTTPException as exc:
self.state.pop_entry("pending_review_confirmations", user_id)
self._capture_review_confirmation_suggestion(
tool_name=pending_tool_name,
arguments=pending.get("payload") or {},
exc=exc,
user_id=user_id,
)
return self._http_exception_detail(exc)
self._reset_pending_review_states(user_id=user_id)
if pending_tool_name == "agendar_revisao":
self._store_last_review_package(user_id=user_id, payload=pending.get("payload"))
return self._fallback_format_tool_result("agendar_revisao", tool_result)
return self._fallback_format_tool_result(pending_tool_name, tool_result)
def _http_exception_detail(self, exc: HTTPException) -> str:
return self._execution_manager.http_exception_detail(exc)
@ -2953,3 +3395,5 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin, RentalFlowMixin):
tool_name=tool_name,
tool_result=tool_result,
)

@ -1,4 +1,4 @@
import re
import re
import unicodedata
from datetime import datetime, timedelta
@ -92,7 +92,7 @@ def extract_budget_from_text(text: str) -> float | None:
normalized = normalize_text(candidate)
keyword_match = re.search(
r"(?:ate|até|de|por|orcamento|orçamento)\s+(\d[\d\.\,\s]{1,12})(?!\d)",
r"(?:ate|de|por|orcamento)\s+(\d[\d\.\,\s]{1,12})(?!\d)",
normalized,
flags=re.IGNORECASE,
)
@ -128,8 +128,12 @@ def normalize_bool(value) -> bool | None:
def normalize_datetime_connector(text: str) -> str:
compact = " ".join(str(text or "").strip().split())
return re.sub(r"\s+[aàáâã]s\s+", " ", compact, flags=re.IGNORECASE).strip()
return re.sub(
r"((?:\d{1,2}[/-]\d{1,2}[/-]\d{4})|(?:\d{4}[/-]\d{1,2}[/-]\d{1,2}))\s+(?:[^\d\s]{1,6}\s+){1,2}(\d{1,2}:\d{2}(?::\d{2})?(?:\s*(?:Z|[+-]\d{2}:\d{2}))?)",
r"\1 \2",
compact,
flags=re.IGNORECASE,
).strip()
def try_parse_iso_datetime(text: str) -> datetime | None:
candidate = str(text or "").strip()
@ -255,7 +259,9 @@ def normalize_review_datetime_text(value, now_provider=None) -> str | None:
normalized = normalize_text(text)
day_offset = None
if "amanha" in normalized or "tomorrow" in normalized:
if "depois de amanha" in normalized or "day after tomorrow" in normalized:
day_offset = 2
elif "amanha" in normalized or "tomorrow" in normalized:
day_offset = 1
elif "hoje" in normalized or "today" in normalized:
day_offset = 0
@ -320,3 +326,4 @@ def normalize_order_number(value) -> str | None:
if order_number and re.fullmatch(r"PED-[A-Z0-9\\-]+", order_number):
return order_number
return None

@ -261,12 +261,12 @@ class OrderFlowHarness(OrderFlowMixin):
class RentalFlowHarness(RentalFlowMixin):
def __init__(self, state, registry):
def __init__(self, state, registry, rental_now_provider=None):
self.state = state
self.registry = registry
self.tool_executor = registry
self.normalizer = EntityNormalizer()
self._rental_now_provider = rental_now_provider
def _get_user_context(self, user_id: int | None):
return self.state.get_user_context(user_id)
@ -1105,9 +1105,17 @@ class CreateOrderFlowWithVehicleTests(unittest.IsolatedAsyncioTestCase):
async def test_order_flow_accepts_turn_decision_without_legacy_intents(self):
state = FakeState(
entries={
"pending_order_drafts": {
10: {
"payload": {"cpf": "12345678909"},
"expires_at": utc_now() + timedelta(minutes=30),
}
}
},
contexts={
10: {
"generic_memory": {"cpf": "12345678909"},
"generic_memory": {},
"last_stock_results": [
{"id": 1, "modelo": "Honda Civic 2021", "categoria": "sedan", "preco": 51524.0},
],
@ -1143,9 +1151,17 @@ class CreateOrderFlowWithVehicleTests(unittest.IsolatedAsyncioTestCase):
async def test_order_flow_accepts_model_intent_without_keyword_trigger(self):
state = FakeState(
entries={
"pending_order_drafts": {
10: {
"payload": {"cpf": "12345678909"},
"expires_at": utc_now() + timedelta(minutes=30),
}
}
},
contexts={
10: {
"generic_memory": {"cpf": "12345678909"},
"generic_memory": {},
"last_stock_results": [
{"id": 1, "modelo": "Honda Civic 2021", "categoria": "sedan", "preco": 51524.0},
],
@ -1201,9 +1217,166 @@ class CreateOrderFlowWithVehicleTests(unittest.IsolatedAsyncioTestCase):
intents={"order_create": True},
)
self.assertIn("escolha primeiro qual veiculo", response.lower())
self.assertIn("Honda Civic 2021", response)
self.assertEqual(
response,
"Pode me dizer a faixa de preco, o modelo ou o tipo de carro que voce procura.",
)
self.assertEqual(registry.calls, [])
self.assertEqual(state.get_user_context(10)["last_stock_results"], [])
async def test_order_flow_generic_request_asks_for_price_range_even_with_previous_search_context(self):
state = FakeState(
contexts={
10: {
"generic_memory": {"cpf": "12345678909", "orcamento_max": 70000, "perfil_veiculo": ["suv"]},
"shared_memory": {"orcamento_max": 70000, "perfil_veiculo": ["suv"]},
"last_stock_results": [
{"id": 7, "modelo": "Fiat Argo 2020", "categoria": "suv", "preco": 61857.0},
{"id": 3, "modelo": "Chevrolet Onix 2022", "categoria": "suv", "preco": 51809.0},
],
"selected_vehicle": {"id": 7, "modelo": "Fiat Argo 2020", "categoria": "suv", "preco": 61857.0},
"pending_single_vehicle_confirmation": {"id": 7, "modelo": "Fiat Argo 2020", "categoria": "suv", "preco": 61857.0},
}
}
)
registry = FakeRegistry()
flow = OrderFlowHarness(state=state, registry=registry)
response = await flow._try_collect_and_create_order(
message="Quero fazer um pedido de veiculo",
user_id=10,
extracted_fields={},
intents={"order_create": True},
turn_decision={"intent": "order_create", "domain": "sales", "action": "collect_order_create"},
)
draft = state.get_entry("pending_order_drafts", 10)
context = state.get_user_context(10)
self.assertEqual(
response,
"Pode me dizer a faixa de preco, o modelo ou o tipo de carro que voce procura.",
)
self.assertEqual(registry.calls, [])
self.assertIsNotNone(draft)
self.assertEqual(draft["payload"], {})
self.assertEqual(context["last_stock_results"], [])
self.assertIsNone(context["selected_vehicle"])
self.assertIsNone(context.get("pending_single_vehicle_confirmation"))
self.assertNotIn("orcamento_max", context["generic_memory"])
self.assertNotIn("perfil_veiculo", context["generic_memory"])
self.assertNotIn("orcamento_max", context["shared_memory"])
self.assertNotIn("perfil_veiculo", context["shared_memory"])
async def test_order_flow_requires_confirmation_before_using_known_cpf(self):
state = FakeState(
contexts={
10: {
"generic_memory": {"cpf": "12345678909"},
"last_stock_results": [
{"id": 1, "modelo": "Honda Civic 2021", "categoria": "sedan", "preco": 51524.0},
],
"selected_vehicle": None,
}
}
)
registry = FakeRegistry()
flow = OrderFlowHarness(state=state, registry=registry)
hydrated = []
async def fake_hydrate_mock_customer_from_cpf(cpf: str, user_id: int | None = None):
hydrated.append((cpf, user_id))
return {"cpf": cpf, "user_id": user_id}
with patch(
"app.services.flows.order_flow.hydrate_mock_customer_from_cpf",
new=fake_hydrate_mock_customer_from_cpf,
):
first_response = await flow._try_collect_and_create_order(
message="1",
user_id=10,
extracted_fields={},
intents={},
)
draft = state.get_entry("pending_order_drafts", 10)
self.assertIsNotNone(draft)
self.assertEqual(draft["payload"].get("vehicle_id"), 1)
self.assertEqual(draft["payload"].get("cpf"), "12345678909")
self.assertIs(draft["payload"].get("cpf_confirmed"), False)
self.assertIn("cpf informado anteriormente", first_response.lower())
self.assertIn("continua correto", first_response.lower())
self.assertEqual(registry.calls, [])
self.assertEqual(hydrated, [])
second_response = await flow._try_collect_and_create_order(
message="sim",
user_id=10,
extracted_fields={},
intents={},
)
self.assertEqual(len(registry.calls), 1)
self.assertEqual(registry.calls[0][0], "realizar_pedido")
self.assertEqual(registry.calls[0][1]["vehicle_id"], 1)
self.assertEqual(registry.calls[0][1]["cpf"], "12345678909")
self.assertEqual(hydrated, [("12345678909", 10)])
self.assertIn("Pedido criado com sucesso.", second_response)
async def test_order_flow_updates_known_cpf_after_negative_confirmation_and_new_value(self):
state = FakeState(
contexts={
10: {
"generic_memory": {"cpf": "12345678909"},
"last_stock_results": [
{"id": 1, "modelo": "Honda Civic 2021", "categoria": "sedan", "preco": 51524.0},
],
"selected_vehicle": None,
}
}
)
registry = FakeRegistry()
flow = OrderFlowHarness(state=state, registry=registry)
hydrated = []
async def fake_hydrate_mock_customer_from_cpf(cpf: str, user_id: int | None = None):
hydrated.append((cpf, user_id))
return {"cpf": cpf, "user_id": user_id}
with patch(
"app.services.flows.order_flow.hydrate_mock_customer_from_cpf",
new=fake_hydrate_mock_customer_from_cpf,
):
await flow._try_collect_and_create_order(
message="1",
user_id=10,
extracted_fields={},
intents={},
)
second_response = await flow._try_collect_and_create_order(
message="nao",
user_id=10,
extracted_fields={},
intents={},
)
third_response = await flow._try_collect_and_create_order(
message="52998224725",
user_id=10,
extracted_fields={},
intents={},
)
self.assertIn("me informe o cpf correto", second_response.lower())
self.assertEqual(len(registry.calls), 1)
self.assertEqual(registry.calls[0][0], "realizar_pedido")
self.assertEqual(registry.calls[0][1]["vehicle_id"], 1)
self.assertEqual(registry.calls[0][1]["cpf"], "52998224725")
self.assertEqual(hydrated, [("52998224725", 10)])
self.assertEqual(state.get_user_context(10)["generic_memory"]["cpf"], "52998224725")
self.assertEqual(state.get_user_context(10)["shared_memory"]["cpf"], "52998224725")
self.assertIn("Pedido criado com sucesso.", third_response)
async def test_order_flow_single_stock_result_requires_explicit_confirmation(self):
state = FakeState(
@ -2003,6 +2176,186 @@ class RentalFlowDraftTests(unittest.IsolatedAsyncioTestCase):
self.assertIsNone(state.get_entry("pending_rental_drafts", 21))
self.assertIn("LOC-TESTE-123", response)
async def test_rental_flow_preserves_relative_dates_from_initial_request_until_vehicle_selection(self):
fixed_now = lambda: datetime(2026, 3, 19, 9, 0)
state = FakeState(contexts={21: self._base_context()})
registry = FakeRegistry()
flow = RentalFlowHarness(state=state, registry=registry, rental_now_provider=fixed_now)
list_response = await flow._try_collect_and_open_rental(
message="Quero alugar um hatch amanha 10h ate depois de amanha 10h",
user_id=21,
extracted_fields={},
intents={},
turn_decision={"intent": "rental_create", "domain": "rental", "action": "answer_user"},
)
self.assertEqual(registry.calls[0][0], "consultar_frota_aluguel")
self.assertNotIn("modelo", registry.calls[0][1])
draft = state.get_entry("pending_rental_drafts", 21)
self.assertIsNotNone(draft)
self.assertEqual(draft["payload"]["data_inicio"], "20/03/2026 10:00")
self.assertEqual(draft["payload"]["data_fim_prevista"], "21/03/2026 10:00")
self.assertEqual(draft["payload"]["categoria"], "hatch")
self.assertIn("veiculo(s) para locacao", list_response)
open_response = await flow._try_collect_and_open_rental(
message="1",
user_id=21,
extracted_fields={},
intents={},
turn_decision={"intent": "rental_create", "domain": "rental", "action": "answer_user"},
)
self.assertEqual(registry.calls[1][0], "abrir_locacao_aluguel")
self.assertEqual(registry.calls[1][1]["data_inicio"], "20/03/2026 10:00")
self.assertEqual(registry.calls[1][1]["data_fim_prevista"], "21/03/2026 10:00")
self.assertIn("LOC-TESTE-123", open_response)
async def test_rental_flow_preserves_relative_dates_even_when_day_words_arrive_truncated(self):
fixed_now = lambda: datetime(2026, 3, 19, 9, 0)
state = FakeState(contexts={21: self._base_context()})
registry = FakeRegistry()
flow = RentalFlowHarness(state=state, registry=registry, rental_now_provider=fixed_now)
list_response = await flow._try_collect_and_open_rental(
message="Quero alugar um hatch amanh 10h at depois de amanh 10h",
user_id=21,
extracted_fields={},
intents={},
turn_decision={"intent": "rental_create", "domain": "rental", "action": "answer_user"},
)
self.assertEqual(registry.calls[0][0], "consultar_frota_aluguel")
draft = state.get_entry("pending_rental_drafts", 21)
self.assertIsNotNone(draft)
self.assertEqual(draft["payload"]["data_inicio"], "20/03/2026 10:00")
self.assertEqual(draft["payload"]["data_fim_prevista"], "21/03/2026 10:00")
self.assertNotIn("modelo", registry.calls[0][1])
self.assertIn("veiculo(s) para locacao", list_response)
open_response = await flow._try_collect_and_open_rental(
message="1",
user_id=21,
extracted_fields={},
intents={},
turn_decision={"intent": "rental_create", "domain": "rental", "action": "answer_user"},
)
self.assertEqual(registry.calls[1][0], "abrir_locacao_aluguel")
self.assertEqual(registry.calls[1][1]["data_inicio"], "20/03/2026 10:00")
self.assertEqual(registry.calls[1][1]["data_fim_prevista"], "21/03/2026 10:00")
self.assertIn("LOC-TESTE-123", open_response)
async def test_rental_flow_preserves_relative_dates_even_when_day_words_arrive_with_question_marks(self):
fixed_now = lambda: datetime(2026, 3, 19, 9, 0)
state = FakeState(contexts={21: self._base_context()})
registry = FakeRegistry()
flow = RentalFlowHarness(state=state, registry=registry, rental_now_provider=fixed_now)
list_response = await flow._try_collect_and_open_rental(
message="Quero alugar um hatch amanh? 10h at? depois de amanh? 10h",
user_id=21,
extracted_fields={},
intents={},
turn_decision={"intent": "rental_create", "domain": "rental", "action": "answer_user"},
)
self.assertEqual(registry.calls[0][0], "consultar_frota_aluguel")
draft = state.get_entry("pending_rental_drafts", 21)
self.assertIsNotNone(draft)
self.assertEqual(draft["payload"]["data_inicio"], "20/03/2026 10:00")
self.assertEqual(draft["payload"]["data_fim_prevista"], "21/03/2026 10:00")
self.assertNotIn("modelo", registry.calls[0][1])
self.assertIn("veiculo(s) para locacao", list_response)
open_response = await flow._try_collect_and_open_rental(
message="1",
user_id=21,
extracted_fields={},
intents={},
turn_decision={"intent": "rental_create", "domain": "rental", "action": "answer_user"},
)
self.assertEqual(registry.calls[1][0], "abrir_locacao_aluguel")
self.assertEqual(registry.calls[1][1]["data_inicio"], "20/03/2026 10:00")
self.assertEqual(registry.calls[1][1]["data_fim_prevista"], "21/03/2026 10:00")
self.assertIn("LOC-TESTE-123", open_response)
async def test_rental_flow_rehydrates_search_payload_from_context_when_selection_survives_without_draft(self):
state = FakeState(
entries={
"pending_rental_selections": {
21: {
"payload": [
{"id": 1, "placa": "RAA1A01", "modelo": "Chevrolet Tracker", "categoria": "hatch", "ano": 2024, "valor_diaria": 219.9, "status": "disponivel"},
{"id": 2, "placa": "RAA1A02", "modelo": "Fiat Pulse", "categoria": "hatch", "ano": 2024, "valor_diaria": 189.9, "status": "disponivel"},
],
"expires_at": utc_now() + timedelta(minutes=15),
}
}
},
contexts={
21: self._base_context() | {
"active_domain": "rental",
}
},
)
state.get_entry("pending_rental_selections", 21)["search_payload"] = {
"categoria": "hatch",
"data_inicio": "20/03/2026 10:00",
"data_fim_prevista": "21/03/2026 10:00",
}
registry = FakeRegistry()
flow = RentalFlowHarness(state=state, registry=registry)
response = await flow._try_collect_and_open_rental(
message="2",
user_id=21,
extracted_fields={},
intents={},
turn_decision={"intent": "rental_create", "domain": "rental", "action": "answer_user"},
)
self.assertEqual(registry.calls[0][0], "abrir_locacao_aluguel")
self.assertEqual(registry.calls[0][1]["rental_vehicle_id"], 2)
self.assertEqual(registry.calls[0][1]["data_inicio"], "20/03/2026 10:00")
self.assertEqual(registry.calls[0][1]["data_fim_prevista"], "21/03/2026 10:00")
self.assertIn("LOC-TESTE-123", response)
async def test_rental_flow_opens_contract_after_collecting_relative_dates_follow_up(self):
fixed_now = lambda: datetime(2026, 3, 19, 9, 0)
state = FakeState(
entries={
"pending_rental_drafts": {
21: {
"payload": {
"rental_vehicle_id": 1,
"placa": "RAA1A01",
"modelo_veiculo": "Chevrolet Tracker",
},
"expires_at": utc_now() + timedelta(minutes=15),
}
}
},
contexts={21: self._base_context() | {"active_domain": "rental", "selected_rental_vehicle": {"id": 1, "placa": "RAA1A01", "modelo": "Chevrolet Tracker", "categoria": "suv", "ano": 2024, "valor_diaria": 219.9, "status": "disponivel"}}},
)
registry = FakeRegistry()
flow = RentalFlowHarness(state=state, registry=registry, rental_now_provider=fixed_now)
response = await flow._try_collect_and_open_rental(
message="amanha 10h ate depois de amanha 10h",
user_id=21,
extracted_fields={},
intents={},
turn_decision={"intent": "rental_create", "domain": "rental", "action": "answer_user"},
)
self.assertEqual(registry.calls[0][0], "abrir_locacao_aluguel")
self.assertEqual(registry.calls[0][1]["data_inicio"], "20/03/2026 10:00")
self.assertEqual(registry.calls[0][1]["data_fim_prevista"], "21/03/2026 10:00")
self.assertIsNone(state.get_entry("pending_rental_drafts", 21))
self.assertIn("LOC-TESTE-123", response)
class ReviewFlowDraftTests(unittest.IsolatedAsyncioTestCase):
async def test_review_flow_extracts_relative_datetime_from_followup_message(self):
@ -2066,6 +2419,36 @@ class ReviewFlowDraftTests(unittest.IsolatedAsyncioTestCase):
self.assertIn("- o modelo do veiculo", response)
self.assertNotIn("- a data e hora desejada para a revisao", response)
async def test_review_flow_date_only_supports_day_after_tomorrow(self):
fixed_now = lambda: datetime(2026, 3, 12, 9, 0)
state = FakeState(
entries={
"pending_review_drafts": {
21: {
"payload": {"placa": "ABC1269"},
"expires_at": utc_now() + timedelta(minutes=30),
}
}
}
)
registry = FakeRegistry()
flow = ReviewFlowHarness(state=state, registry=registry, review_now_provider=fixed_now)
response = await flow._try_collect_and_schedule_review(
message="depois de amanha",
user_id=21,
extracted_fields={},
intents={},
turn_decision={"intent": "review_schedule", "domain": "review", "action": "answer_user"},
)
draft = state.get_entry("pending_review_drafts", 21)
self.assertIsNotNone(draft)
self.assertEqual(draft["payload"].get("data_hora_base"), "14/03/2026")
self.assertIn("Perfeito. Tenho a data 14/03/2026.", response)
self.assertIn("- o horario desejado para a revisao", response)
async def test_review_flow_keeps_review_draft_when_time_follow_up_is_misclassified_as_sales(self):
state = FakeState(
entries={
@ -2862,6 +3245,37 @@ class ReviewFlowDraftTests(unittest.IsolatedAsyncioTestCase):
self.assertEqual(registry.calls[0][1]["nova_data_hora"], "13/03/2026 11:00")
self.assertIn("13/03/2026 11:00", response)
async def test_review_management_reschedule_consumes_day_after_tomorrow_relative_datetime_follow_up(self):
fixed_now = lambda: datetime(2026, 3, 12, 9, 0)
state = FakeState(
entries={
"pending_review_management_drafts": {
21: {
"action": "reschedule",
"payload": {"protocolo": "REV-20260313-F754AF27"},
"expires_at": utc_now() + timedelta(minutes=30),
}
}
}
)
registry = FakeRegistry()
flow = ReviewFlowHarness(state=state, registry=registry, review_now_provider=fixed_now)
response = await flow._try_handle_review_management(
message="depois de amanha 11h",
user_id=21,
extracted_fields={},
intents={},
turn_decision={"intent": "review_reschedule", "domain": "review", "action": "answer_user"},
)
self.assertIsNone(state.get_entry("pending_review_management_drafts", 21))
self.assertEqual(registry.calls[0][0], "editar_data_revisao")
self.assertEqual(registry.calls[0][1]["protocolo"], "REV-20260313-F754AF27")
self.assertEqual(registry.calls[0][1]["nova_data_hora"], "14/03/2026 11:00")
self.assertIn("14/03/2026 11:00", response)
async def test_review_management_reschedule_date_only_then_time_follow_up(self):
fixed_now = lambda: datetime(2026, 3, 12, 9, 0)
state = FakeState(
@ -2904,6 +3318,50 @@ class ReviewFlowDraftTests(unittest.IsolatedAsyncioTestCase):
self.assertEqual(registry.calls[0][1]["nova_data_hora"], "13/03/2026 11:00")
self.assertIn("13/03/2026 11:00", second_response)
async def test_review_management_reschedule_conflict_stores_pending_confirmation_suggestion(self):
fixed_now = lambda: datetime(2026, 3, 12, 9, 0)
state = FakeState(
entries={
"pending_review_management_drafts": {
21: {
"action": "reschedule",
"payload": {"protocolo": "REV-20260313-F754AF27"},
"expires_at": utc_now() + timedelta(minutes=30),
}
}
}
)
registry = FakeRegistry()
registry.raise_http_exception = HTTPException(
status_code=409,
detail={
"code": "review_reschedule_conflict",
"message": "O horario 14/03/2026 as 11:00 ja esta ocupado. Posso agendar em 14/03/2026 as 12:00.",
"retryable": True,
"field": "nova_data_hora",
"suggested_iso": "2026-03-14T12:00:00-03:00",
},
)
flow = ReviewFlowHarness(state=state, registry=registry, review_now_provider=fixed_now)
response = await flow._try_handle_review_management(
message="depois de amanha 11h",
user_id=21,
extracted_fields={},
intents={},
turn_decision={"intent": "review_reschedule", "domain": "review", "action": "answer_user"},
)
draft = state.get_entry("pending_review_management_drafts", 21)
self.assertIsNotNone(draft)
self.assertNotIn("nova_data_hora", draft["payload"])
self.assertEqual(len(flow.captured_suggestions), 1)
suggestion = flow.captured_suggestions[0]
self.assertEqual(suggestion["tool_name"], "editar_data_revisao")
self.assertEqual(suggestion["arguments"]["protocolo"], "REV-20260313-F754AF27")
self.assertEqual(suggestion["arguments"]["nova_data_hora"], "14/03/2026 11:00")
self.assertIn("ocupado", response)
async def test_review_management_infers_listing_intent_from_agendamentos_message(self):
state = FakeState()
registry = FakeRegistry()
@ -3192,7 +3650,3 @@ class ToolRegistryExecutionTests(unittest.IsolatedAsyncioTestCase):
if __name__ == "__main__":
unittest.main()

@ -62,6 +62,25 @@ class LLMServiceResponseParsingTests(unittest.TestCase):
self.assertEqual(payload, {"response": "Resposta simples", "tool_call": None})
def test_extract_response_payload_falls_back_to_response_text_accessor(self):
service = LLMService.__new__(LLMService)
response = SimpleNamespace(
text='{"ok": true}',
candidates=[
SimpleNamespace(
content=SimpleNamespace(
parts=[
SimpleNamespace(function_call=None),
]
)
)
]
)
payload = service._extract_response_payload(response)
self.assertEqual(payload, {"response": '{"ok": true}', "tool_call": None})
class LLMServiceImageWorkflowPromptTests(unittest.TestCase):
def test_build_image_workflow_prompt_preserves_visible_payment_time(self):
@ -102,3 +121,87 @@ class LLMServiceImageWorkflowPromptTests(unittest.TestCase):
response,
"Registrar pagamento de aluguel: contrato LOC-20260319-33CD6567; valor R$ 379,80.",
)
class LLMServiceDispatchTests(unittest.IsolatedAsyncioTestCase):
async def test_generate_response_uses_generate_content_when_history_is_empty(self):
service = LLMService.__new__(LLMService)
service.model_names = ["gemini-2.5-pro"]
service._log_llm_event = lambda *args, **kwargs: None
service.build_vertex_tools = lambda tools: None
class DummyChat:
def __init__(self):
self.calls = []
def send_message(self, message, **kwargs):
self.calls.append((message, kwargs))
return SimpleNamespace(candidates=[])
class DummyModel:
def __init__(self):
self.generate_calls = []
self.chat = DummyChat()
def generate_content(self, message, **kwargs):
self.generate_calls.append((message, kwargs))
return SimpleNamespace(candidates=[])
def start_chat(self, history):
raise AssertionError("nao deveria abrir chat quando nao ha historico")
model = DummyModel()
service._get_model = lambda model_name: model
service._extract_response_payload = lambda response: {"response": "ok", "tool_call": None}
generation_config = {"temperature": 0, "max_output_tokens": 128}
payload = await service.generate_response(
message="teste",
tools=[],
history=[],
generation_config=generation_config,
)
self.assertEqual(payload, {"response": "ok", "tool_call": None})
self.assertEqual(
model.generate_calls,
[("teste", {"generation_config": generation_config})],
)
async def test_generate_response_uses_chat_when_history_is_present(self):
service = LLMService.__new__(LLMService)
service.model_names = ["gemini-2.5-pro"]
service._log_llm_event = lambda *args, **kwargs: None
service.build_vertex_tools = lambda tools: None
class DummyChat:
def __init__(self):
self.calls = []
def send_message(self, message, **kwargs):
self.calls.append((message, kwargs))
return SimpleNamespace(candidates=[])
class DummyModel:
def __init__(self):
self.chat = DummyChat()
self.histories = []
def generate_content(self, message, **kwargs):
raise AssertionError("nao deveria usar generate_content quando ha historico")
def start_chat(self, history):
self.histories.append(history)
return self.chat
model = DummyModel()
service._get_model = lambda model_name: model
service._extract_response_payload = lambda response: {"response": "ok", "tool_call": None}
history = [{"role": "user", "parts": ["oi"]}]
payload = await service.generate_response(message="teste", tools=[], history=history)
self.assertEqual(payload, {"response": "ok", "tool_call": None})
self.assertEqual(model.histories, [history])
self.assertEqual(model.chat.calls, [("teste", {})])

File diff suppressed because it is too large Load Diff
Loading…
Cancel
Save