📊 feat(observability): instrumentar latencia ponta a ponta e reaproveitar entidades do turno

chore/observability-latency-markers
parent 31d02a7daa
commit 9a97345f62

@ -3,6 +3,7 @@ import logging
import os import os
import tempfile import tempfile
from datetime import timedelta from datetime import timedelta
from time import perf_counter
from typing import Any, Dict, List from typing import Any, Dict, List
import aiohttp import aiohttp
@ -153,6 +154,12 @@ class TelegramSatelliteService:
self._chat_workers_lock = asyncio.Lock() self._chat_workers_lock = asyncio.Lock()
self._chat_processing_semaphore = asyncio.Semaphore(TELEGRAM_MAX_CONCURRENT_CHATS) self._chat_processing_semaphore = asyncio.Semaphore(TELEGRAM_MAX_CONCURRENT_CHATS)
def _log_telegram_event(self, event: str, **payload) -> None:
logger.info("telegram_event=%s payload=%s", event, mask_sensitive_payload(payload))
def _elapsed_ms(self, started_at: float) -> float:
return round((perf_counter() - started_at) * 1000, 2)
async def run(self) -> None: async def run(self) -> None:
"""Inicia loop de long polling para consumir atualizacoes do bot.""" """Inicia loop de long polling para consumir atualizacoes do bot."""
logger.info("Telegram satellite iniciado com long polling.") logger.info("Telegram satellite iniciado com long polling.")
@ -292,9 +299,19 @@ class TelegramSatelliteService:
if queue is None: if queue is None:
queue = asyncio.Queue() queue = asyncio.Queue()
self._chat_queues[chat_id] = queue self._chat_queues[chat_id] = queue
update["_orq_enqueued_at_perf"] = perf_counter()
queue.put_nowait(update) queue.put_nowait(update)
queue_size = queue.qsize()
worker = self._chat_workers.get(chat_id) worker = self._chat_workers.get(chat_id)
worker_active = worker is not None and not worker.done()
self._log_telegram_event(
"chat_update_enqueued",
chat_id=chat_id,
update_id=update.get("update_id"),
queue_size=queue_size,
worker_active=worker_active,
)
if worker is None or worker.done(): if worker is None or worker.done():
self._chat_workers[chat_id] = asyncio.create_task( self._chat_workers[chat_id] = asyncio.create_task(
self._run_chat_worker( self._run_chat_worker(
@ -315,6 +332,19 @@ class TelegramSatelliteService:
try: try:
while True: while True:
update = await queue.get() update = await queue.get()
queued_at_perf = update.get("_orq_enqueued_at_perf")
queue_wait_ms = (
round((perf_counter() - queued_at_perf) * 1000, 2)
if isinstance(queued_at_perf, (int, float))
else None
)
self._log_telegram_event(
"chat_update_dequeued",
chat_id=chat_id,
update_id=update.get("update_id"),
queue_wait_ms=queue_wait_ms,
queue_size=queue.qsize(),
)
try: try:
async with self._chat_processing_semaphore: async with self._chat_processing_semaphore:
await self._handle_update(session=session, update=update) await self._handle_update(session=session, update=update)
@ -410,12 +440,28 @@ class TelegramSatelliteService:
if offset is not None: if offset is not None:
payload["offset"] = offset payload["offset"] = offset
started_at = perf_counter()
async with session.post(f"{self.base_url}/getUpdates", json=payload) as response: async with session.post(f"{self.base_url}/getUpdates", json=payload) as response:
data = await response.json() data = await response.json()
if not data.get("ok"): if not data.get("ok"):
self._log_telegram_event(
"get_updates_failed",
offset=offset,
elapsed_ms=self._elapsed_ms(started_at),
response=data,
)
logger.warning("Falha em getUpdates: %s", data) logger.warning("Falha em getUpdates: %s", data)
return [] return []
return data.get("result", []) updates = data.get("result", [])
if updates:
self._log_telegram_event(
"get_updates_completed",
offset=offset,
updates_count=len(updates),
elapsed_ms=self._elapsed_ms(started_at),
)
return updates
async def _handle_update( async def _handle_update(
self, self,
@ -423,11 +469,13 @@ class TelegramSatelliteService:
update: Dict[str, Any], update: Dict[str, Any],
) -> None: ) -> None:
"""Processa uma atualizacao recebida e envia resposta ao chat.""" """Processa uma atualizacao recebida e envia resposta ao chat."""
started_at = perf_counter()
message = update.get("message", {}) message = update.get("message", {})
text = message.get("text") or message.get("caption") text = message.get("text") or message.get("caption")
chat = message.get("chat", {}) chat = message.get("chat", {})
chat_id = chat.get("id") chat_id = chat.get("id")
sender = message.get("from", {}) sender = message.get("from", {})
update_id = update.get("update_id")
if not chat_id: if not chat_id:
return return
@ -441,11 +489,29 @@ class TelegramSatelliteService:
self._build_update_idempotency_key(update), self._build_update_idempotency_key(update),
) )
await self._deliver_message(session=session, chat_id=chat_id, text=cached_answer) await self._deliver_message(session=session, chat_id=chat_id, text=cached_answer)
self._log_telegram_event(
"update_completed",
update_id=update_id,
chat_id=chat_id,
cached_hit=True,
elapsed_ms=self._elapsed_ms(started_at),
input_chars=len(str(text or "")),
answer_chars=len(cached_answer),
image_count=0,
)
return return
image_attachments = await self._extract_image_attachments(session=session, message=message) image_attachments = await self._extract_image_attachments(session=session, message=message)
image_count = len(image_attachments)
if not text and not image_attachments: if not text and not image_attachments:
self._log_telegram_event(
"update_ignored",
update_id=update_id,
chat_id=chat_id,
reason="empty_text_and_no_image",
elapsed_ms=self._elapsed_ms(started_at),
)
return return
try: try:
@ -463,10 +529,19 @@ class TelegramSatelliteService:
answer = "Nao consegui processar sua solicitacao agora. Tente novamente em instantes." answer = "Nao consegui processar sua solicitacao agora. Tente novamente em instantes."
self._store_processed_update(update=update, answer=answer) self._store_processed_update(update=update, answer=answer)
update_id = update.get("update_id")
if isinstance(update_id, int): if isinstance(update_id, int):
self._persist_last_processed_update_id(update_id) self._persist_last_processed_update_id(update_id)
await self._deliver_message(session=session, chat_id=chat_id, text=answer) await self._deliver_message(session=session, chat_id=chat_id, text=answer)
self._log_telegram_event(
"update_completed",
update_id=update_id,
chat_id=chat_id,
cached_hit=False,
elapsed_ms=self._elapsed_ms(started_at),
input_chars=len(str(text or "")),
answer_chars=len(str(answer or "")),
image_count=image_count,
)
async def _deliver_message( async def _deliver_message(
self, self,
@ -488,17 +563,24 @@ class TelegramSatelliteService:
text: str, text: str,
) -> None: ) -> None:
"""Envia mensagem de texto para o chat informado no Telegram.""" """Envia mensagem de texto para o chat informado no Telegram."""
for chunk_index, chunk in enumerate(_split_telegram_text(text), start=1): chunks = _split_telegram_text(text)
started_at = perf_counter()
total_attempts = 0
successful_chunks = 0
for chunk_index, chunk in enumerate(chunks, start=1):
payload = { payload = {
"chat_id": chat_id, "chat_id": chat_id,
"text": chunk, "text": chunk,
} }
for attempt in range(1, TELEGRAM_SEND_MESSAGE_MAX_ATTEMPTS + 1): for attempt in range(1, TELEGRAM_SEND_MESSAGE_MAX_ATTEMPTS + 1):
total_attempts += 1
try: try:
async with session.post(f"{self.base_url}/sendMessage", json=payload) as response: async with session.post(f"{self.base_url}/sendMessage", json=payload) as response:
data = await response.json() data = await response.json()
if not data.get("ok"): if not data.get("ok"):
logger.warning("Falha em sendMessage: %s", data) logger.warning("Falha em sendMessage: %s", data)
else:
successful_chunks += 1
break break
except (aiohttp.ClientError, asyncio.TimeoutError, OSError) as exc: except (aiohttp.ClientError, asyncio.TimeoutError, OSError) as exc:
if attempt >= TELEGRAM_SEND_MESSAGE_MAX_ATTEMPTS: if attempt >= TELEGRAM_SEND_MESSAGE_MAX_ATTEMPTS:
@ -522,6 +604,16 @@ class TelegramSatelliteService:
) )
await asyncio.sleep(delay_seconds) await asyncio.sleep(delay_seconds)
self._log_telegram_event(
"send_message_completed",
chat_id=chat_id,
chunk_count=len(chunks),
successful_chunks=successful_chunks,
total_attempts=total_attempts,
elapsed_ms=self._elapsed_ms(started_at),
text_chars=len(str(text or "")),
)
# Processa uma mensagem do Telegram e injeta o texto extraido de imagens quando houver. # Processa uma mensagem do Telegram e injeta o texto extraido de imagens quando houver.
async def _process_message( async def _process_message(
self, self,
@ -531,22 +623,49 @@ class TelegramSatelliteService:
image_attachments: List[Dict[str, Any]] | None = None, image_attachments: List[Dict[str, Any]] | None = None,
) -> str: ) -> str:
"""Encaminha mensagem ao orquestrador com usuario identificado e retorna resposta.""" """Encaminha mensagem ao orquestrador com usuario identificado e retorna resposta."""
started_at = perf_counter()
message_text = text message_text = text
image_processing_ms = None
if image_attachments: if image_attachments:
image_started_at = perf_counter()
image_message = await self._build_orchestration_message_from_image( image_message = await self._build_orchestration_message_from_image(
caption=text, caption=text,
image_attachments=image_attachments, image_attachments=image_attachments,
) )
image_processing_ms = self._elapsed_ms(image_started_at)
if self._is_image_analysis_failure_message(image_message): if self._is_image_analysis_failure_message(image_message):
self._log_telegram_event(
"process_message_completed",
chat_id=chat_id,
elapsed_ms=self._elapsed_ms(started_at),
image_processing_ms=image_processing_ms,
orchestration_offloaded=False,
used_image_attachments=True,
input_chars=len(str(text or "")),
response_chars=len(image_message),
)
return image_message return image_message
message_text = image_message message_text = image_message
return await asyncio.to_thread( orchestration_started_at = perf_counter()
answer = await asyncio.to_thread(
self._run_blocking_orchestration_turn, self._run_blocking_orchestration_turn,
message_text=message_text, message_text=message_text,
sender=sender, sender=sender,
chat_id=chat_id, chat_id=chat_id,
) )
self._log_telegram_event(
"process_message_completed",
chat_id=chat_id,
elapsed_ms=self._elapsed_ms(started_at),
image_processing_ms=image_processing_ms,
orchestration_ms=self._elapsed_ms(orchestration_started_at),
orchestration_offloaded=True,
used_image_attachments=bool(image_attachments),
input_chars=len(message_text),
response_chars=len(str(answer or "")),
)
return answer
def _run_blocking_orchestration_turn( def _run_blocking_orchestration_turn(
self, self,
@ -559,9 +678,11 @@ class TelegramSatelliteService:
Executa o turno do orquestrador fora do loop async principal. Executa o turno do orquestrador fora do loop async principal.
Isso isola sessoes SQLAlchemy sincronas e outras operacoes bloqueantes. Isso isola sessoes SQLAlchemy sincronas e outras operacoes bloqueantes.
""" """
started_at = perf_counter()
tools_db = SessionLocal() tools_db = SessionLocal()
mock_db = SessionMockLocal() mock_db = SessionMockLocal()
try: try:
user_resolution_started_at = perf_counter()
user_service = UserService(mock_db) user_service = UserService(mock_db)
external_id = str(sender.get("id") or chat_id) external_id = str(sender.get("id") or chat_id)
first_name = (sender.get("first_name") or "").strip() first_name = (sender.get("first_name") or "").strip()
@ -575,12 +696,30 @@ class TelegramSatelliteService:
name=display_name, name=display_name,
username=username, username=username,
) )
user_resolution_ms = self._elapsed_ms(user_resolution_started_at)
service_init_started_at = perf_counter()
service = OrquestradorService( service = OrquestradorService(
tools_db, tools_db,
state_repository=self.state, state_repository=self.state,
) )
return asyncio.run(service.handle_message(message=message_text, user_id=user.id)) service_init_ms = self._elapsed_ms(service_init_started_at)
orchestration_started_at = perf_counter()
response = asyncio.run(service.handle_message(message=message_text, user_id=user.id))
orchestration_ms = self._elapsed_ms(orchestration_started_at)
self._log_telegram_event(
"blocking_turn_completed",
chat_id=chat_id,
user_id=user.id,
elapsed_ms=self._elapsed_ms(started_at),
user_resolution_ms=user_resolution_ms,
service_init_ms=service_init_ms,
orchestration_ms=orchestration_ms,
input_chars=len(message_text),
response_chars=len(str(response or "")),
)
return response
finally: finally:
tools_db.close() tools_db.close()
mock_db.close() mock_db.close()

@ -1,5 +1,7 @@
import asyncio import asyncio
import json import json
import logging
from time import perf_counter
from typing import Dict, Any, List, Optional from typing import Dict, Any, List, Optional
import vertexai import vertexai
@ -9,6 +11,8 @@ from vertexai.generative_models import FunctionDeclaration, GenerativeModel, Par
from app.core.settings import settings from app.core.settings import settings
from app.models.tool_model import ToolDefinition from app.models.tool_model import ToolDefinition
logger = logging.getLogger(__name__)
IMAGE_ANALYSIS_FAILURE_MESSAGE = "Nao consegui identificar os dados da imagem. Descreva o documento ou envie uma foto mais nitida." IMAGE_ANALYSIS_FAILURE_MESSAGE = "Nao consegui identificar os dados da imagem. Descreva o documento ou envie uma foto mais nitida."
INVALID_RECEIPT_WATERMARK_MESSAGE = "O comprovante enviado nao e valido. Envie um comprovante valido com a marca d'agua SysaltiIA visivel." INVALID_RECEIPT_WATERMARK_MESSAGE = "O comprovante enviado nao e valido. Envie um comprovante valido com a marca d'agua SysaltiIA visivel."
VALID_RECEIPT_WATERMARK_MARKER = "[watermark_sysaltiia_ok]" VALID_RECEIPT_WATERMARK_MARKER = "[watermark_sysaltiia_ok]"
@ -37,6 +41,9 @@ class LLMService:
fallback_models = ["gemini-2.5-pro", "gemini-2.5-flash", "gemini-2.0-flash-001"] fallback_models = ["gemini-2.5-pro", "gemini-2.5-flash", "gemini-2.0-flash-001"]
self.model_names = [configured] + [m for m in fallback_models if m != configured] self.model_names = [configured] + [m for m in fallback_models if m != configured]
def _log_llm_event(self, event: str, **payload) -> None:
logger.info("llm_service_event=%s payload=%s", event, payload)
# Transforma anexos de imagem em uma mensagem textual pronta para o orquestrador. # Transforma anexos de imagem em uma mensagem textual pronta para o orquestrador.
async def extract_image_workflow_message( async def extract_image_workflow_message(
self, self,
@ -63,10 +70,15 @@ class LLMService:
response = None response = None
last_error = None last_error = None
selected_model_name = None
attempts = 0
started_at = perf_counter()
for model_name in self.model_names: for model_name in self.model_names:
attempts += 1
try: try:
model = self._get_model(model_name) model = self._get_model(model_name)
response = await asyncio.to_thread(model.generate_content, contents) response = await asyncio.to_thread(model.generate_content, contents)
selected_model_name = model_name
break break
except NotFound as err: except NotFound as err:
last_error = err last_error = err
@ -74,6 +86,13 @@ class LLMService:
continue continue
if response is None: if response is None:
self._log_llm_event(
"image_workflow_failed",
elapsed_ms=round((perf_counter() - started_at) * 1000, 2),
attempts=attempts,
attachments_count=len(attachments),
caption_present=bool(str(caption or "").strip()),
)
if last_error: if last_error:
raise RuntimeError( raise RuntimeError(
f"Nenhum modelo Vertex disponivel para analise de imagem. Erro: {last_error}" f"Nenhum modelo Vertex disponivel para analise de imagem. Erro: {last_error}"
@ -81,6 +100,14 @@ class LLMService:
raise RuntimeError("Falha ao analisar imagem no Vertex AI.") raise RuntimeError("Falha ao analisar imagem no Vertex AI.")
payload = self._extract_response_payload(response) payload = self._extract_response_payload(response)
self._log_llm_event(
"image_workflow_completed",
model_name=selected_model_name,
elapsed_ms=round((perf_counter() - started_at) * 1000, 2),
attempts=attempts,
attachments_count=len(attachments),
caption_present=bool(str(caption or "").strip()),
)
extracted_text = (payload.get("response") or "").strip() or (caption or "").strip() extracted_text = (payload.get("response") or "").strip() or (caption or "").strip()
return self._coerce_image_workflow_response(extracted_text) return self._coerce_image_workflow_response(extracted_text)
@ -214,15 +241,20 @@ class LLMService:
response = None response = None
last_error = None last_error = None
selected_model_name = None
attempts = 0
started_at = perf_counter()
# Tenta o modelo configurado e cai para nomes alternativos # Tenta o modelo configurado e cai para nomes alternativos
# quando o principal nao estiver disponivel no projeto/regiao. # quando o principal nao estiver disponivel no projeto/regiao.
for model_name in self.model_names: for model_name in self.model_names:
attempts += 1
try: try:
model = self._get_model(model_name) model = self._get_model(model_name)
chat = model.start_chat(history=history or []) chat = model.start_chat(history=history or [])
send_kwargs = {"tools": vertex_tools} if vertex_tools else {} send_kwargs = {"tools": vertex_tools} if vertex_tools else {}
response = await asyncio.to_thread(chat.send_message, message, **send_kwargs) response = await asyncio.to_thread(chat.send_message, message, **send_kwargs)
selected_model_name = model_name
break break
except NotFound as err: except NotFound as err:
last_error = err last_error = err
@ -230,13 +262,30 @@ class LLMService:
continue continue
if response is None: if response is None:
self._log_llm_event(
"generate_response_failed",
elapsed_ms=round((perf_counter() - started_at) * 1000, 2),
attempts=attempts,
tools_count=len(tools or []),
history_count=len(history or []),
)
if last_error: if last_error:
raise RuntimeError( raise RuntimeError(
f"Nenhum modelo Vertex disponivel. Verifique VERTEX_MODEL_NAME e acesso no projeto. Erro: {last_error}" f"Nenhum modelo Vertex disponivel. Verifique VERTEX_MODEL_NAME e acesso no projeto. Erro: {last_error}"
) from last_error ) from last_error
raise RuntimeError("Falha ao gerar resposta no Vertex AI.") raise RuntimeError("Falha ao gerar resposta no Vertex AI.")
return self._extract_response_payload(response) payload = self._extract_response_payload(response)
self._log_llm_event(
"generate_response_completed",
model_name=selected_model_name,
elapsed_ms=round((perf_counter() - started_at) * 1000, 2),
attempts=attempts,
tools_count=len(tools or []),
history_count=len(history or []),
tool_call=bool(payload.get("tool_call")),
)
return payload
async def warmup(self) -> None: async def warmup(self) -> None:
"""Preaquece conexao/modelo para reduzir latencia da primeira requisicao real.""" """Preaquece conexao/modelo para reduzir latencia da primeira requisicao real."""

@ -299,18 +299,33 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin, RentalFlowMixin):
user_id=user_id, user_id=user_id,
) )
self._capture_turn_decision_trace(turn_decision) self._capture_turn_decision_trace(turn_decision)
llm_extracted_entities = await self._extract_entities_with_llm( decision_entities = self._extracted_entities_from_turn_decision(turn_decision)
message=routing_message, if self._has_useful_extraction(decision_entities):
user_id=user_id,
)
extracted_entities = self._merge_extracted_entities(
extracted_entities,
llm_extracted_entities,
)
if self._has_useful_turn_decision(turn_decision):
extracted_entities = self._merge_extracted_entities( extracted_entities = self._merge_extracted_entities(
extracted_entities, extracted_entities,
self._extracted_entities_from_turn_decision(turn_decision), decision_entities,
)
if not self._has_useful_extraction(extracted_entities):
llm_extracted_entities = await self._extract_entities_with_llm(
message=routing_message,
user_id=user_id,
)
extracted_entities = self._merge_extracted_entities(
extracted_entities,
llm_extracted_entities,
)
else:
started_at = perf_counter()
self._emit_turn_stage_metric(
"extract_entities_short_circuit",
started_at,
has_message_plan_entities=self._has_useful_extraction(
self._resolve_entities_for_message_plan(
message_plan=message_plan,
routed_message=routing_message,
)
),
has_turn_decision_entities=self._has_useful_extraction(decision_entities),
) )
self._capture_generic_memory( self._capture_generic_memory(
user_id=user_id, user_id=user_id,
@ -1686,7 +1701,9 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin, RentalFlowMixin):
# Nessa funcao eu configuro a memoria volatil do sistema # Nessa funcao eu configuro a memoria volatil do sistema
def _upsert_user_context(self, user_id: int | None) -> None: def _upsert_user_context(self, user_id: int | None) -> None:
started_at = perf_counter()
self._context_manager.upsert_user_context(user_id=user_id) self._context_manager.upsert_user_context(user_id=user_id)
self._emit_turn_stage_metric("upsert_user_context", started_at)
def _get_user_context(self, user_id: int | None) -> dict | None: def _get_user_context(self, user_id: int | None) -> dict | None:
return self._context_manager.get_user_context(user_id) return self._context_manager.get_user_context(user_id)
@ -1738,6 +1755,7 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin, RentalFlowMixin):
user = self._get_user_record(user_id=user_id) user = self._get_user_record(user_id=user_id)
if not user or not getattr(user, "email", None): if not user or not getattr(user, "email", None):
return return
started_at = perf_counter()
try: try:
sync_user_email_integration_routes( sync_user_email_integration_routes(
user_id=user.id, user_id=user.id,
@ -1745,6 +1763,11 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin, RentalFlowMixin):
recipient_name=user.name, recipient_name=user.name,
) )
self._user_profile_routes_ready = True self._user_profile_routes_ready = True
self._emit_turn_stage_metric(
"ensure_user_email_routes",
started_at,
synced_routes_count=6,
)
except Exception: except Exception:
logger.exception( logger.exception(
"Falha ao sincronizar rotas de email do usuario.", "Falha ao sincronizar rotas de email do usuario.",
@ -1953,16 +1976,50 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin, RentalFlowMixin):
return self.normalizer.coerce_extraction_contract(payload) return self.normalizer.coerce_extraction_contract(payload)
async def _extract_message_plan_with_llm(self, message: str, user_id: int | None) -> dict: async def _extract_message_plan_with_llm(self, message: str, user_id: int | None) -> dict:
return await self.planner.extract_message_plan(message=message, user_id=user_id) started_at = perf_counter()
result = await self.planner.extract_message_plan(message=message, user_id=user_id)
self._emit_turn_stage_metric(
"extract_message_plan",
started_at,
order_count=len(result.get("orders") or []) if isinstance(result, dict) else 0,
)
return result
async def _extract_routing_with_llm(self, message: str, user_id: int | None) -> dict: async def _extract_routing_with_llm(self, message: str, user_id: int | None) -> dict:
return await self.planner.extract_routing(message=message, user_id=user_id) started_at = perf_counter()
result = await self.planner.extract_routing(message=message, user_id=user_id)
self._emit_turn_stage_metric(
"extract_routing",
started_at,
order_count=len(result.get("orders") or []) if isinstance(result, dict) else 0,
)
return result
async def _extract_entities_with_llm(self, message: str, user_id: int | None) -> dict: async def _extract_entities_with_llm(self, message: str, user_id: int | None) -> dict:
return await self.planner.extract_entities(message=message, user_id=user_id) started_at = perf_counter()
result = await self.planner.extract_entities(message=message, user_id=user_id)
self._emit_turn_stage_metric(
"extract_entities",
started_at,
has_generic_memory=bool((result or {}).get("generic_memory")),
review_field_keys=[
key
for key, value in ((result or {}).get("review_fields") or {}).items()
if value not in (None, "", [], {})
],
)
return result
async def _extract_sales_search_context_with_llm(self, message: str, user_id: int | None) -> dict: async def _extract_sales_search_context_with_llm(self, message: str, user_id: int | None) -> dict:
return await self.planner.extract_sales_search_context(message=message, user_id=user_id) started_at = perf_counter()
result = await self.planner.extract_sales_search_context(message=message, user_id=user_id)
self._emit_turn_stage_metric(
"extract_sales_search_context",
started_at,
has_budget=bool((result or {}).get("orcamento_max")),
profile_count=len((result or {}).get("perfil_veiculo") or []),
)
return result
async def _extract_missing_sales_search_context_with_llm( async def _extract_missing_sales_search_context_with_llm(
self, self,
@ -1985,7 +2042,16 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin, RentalFlowMixin):
return await self._extract_sales_search_context_with_llm(message=message, user_id=user_id) return await self._extract_sales_search_context_with_llm(message=message, user_id=user_id)
async def _extract_turn_decision_with_llm(self, message: str, user_id: int | None) -> dict: async def _extract_turn_decision_with_llm(self, message: str, user_id: int | None) -> dict:
return await self.planner.extract_turn_decision(message=message, user_id=user_id) started_at = perf_counter()
result = await self.planner.extract_turn_decision(message=message, user_id=user_id)
self._emit_turn_stage_metric(
"extract_turn_decision",
started_at,
intent=str((result or {}).get("intent") or ""),
action=str((result or {}).get("action") or ""),
domain=str((result or {}).get("domain") or ""),
)
return result
async def _try_handle_immediate_context_reset( async def _try_handle_immediate_context_reset(
self, self,
@ -2773,6 +2839,14 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin, RentalFlowMixin):
def _format_turn_error(self, exc: Exception) -> str: def _format_turn_error(self, exc: Exception) -> str:
return self._execution_manager.format_turn_error(exc) return self._execution_manager.format_turn_error(exc)
def _emit_turn_stage_metric(self, operation: str, started_at: float, **payload) -> None:
self._log_turn_event(
"turn_stage_completed",
operation=operation,
elapsed_ms=round((perf_counter() - started_at) * 1000, 2),
**payload,
)
def _log_turn_event(self, event: str, **payload) -> None: def _log_turn_event(self, event: str, **payload) -> None:
self._execution_manager.log_turn_event(event, **payload) self._execution_manager.log_turn_event(event, **payload)

@ -4788,6 +4788,180 @@ class TurnDecisionContractTests(unittest.IsolatedAsyncioTestCase):
class OrquestradorLatencyOptimizationTests(unittest.IsolatedAsyncioTestCase):
def _build_service(self, state=None):
default_state = state or FakeState(
contexts={
1: {
"active_domain": "general",
"generic_memory": {},
"shared_memory": {},
"order_queue": [],
"pending_order_selection": None,
"pending_switch": None,
"last_stock_results": [],
"selected_vehicle": None,
}
}
)
service = OrquestradorService.__new__(OrquestradorService)
service.state = default_state
service.normalizer = EntityNormalizer()
service.history_service = SimpleNamespace(record_turn=lambda **kwargs: None)
service._get_user_context = lambda user_id: default_state.get_user_context(user_id)
service._save_user_context = lambda user_id, context: default_state.save_user_context(user_id, context)
service._log_turn_event = lambda *args, **kwargs: None
service._compose_order_aware_response = lambda response, user_id, queue_notice=None: response
service._append_email_capture_prompt_if_needed = lambda response, user_id: response
service._finalize_turn_history = lambda **kwargs: None
service._upsert_user_context = lambda user_id: None
service._ensure_user_email_routes = lambda user_id: None
service._capture_turn_decision_trace = lambda turn_decision: None
service._capture_generic_memory = lambda **kwargs: None
service._domain_from_intents = lambda intents: "general"
service._handle_context_switch = lambda **kwargs: None
service._update_active_domain = lambda **kwargs: None
async def fake_maybe_auto_advance_next_order(base_response: str, user_id: int | None):
return base_response
async def fake_none(**kwargs):
return None
service._maybe_auto_advance_next_order = fake_maybe_auto_advance_next_order
service._try_handle_pending_email_capture_message = fake_none
service._try_handle_pending_stock_selection_follow_up = fake_none
service._try_handle_active_sales_follow_up = fake_none
service._try_handle_pending_rental_selection_follow_up = fake_none
service._try_handle_active_rental_follow_up = fake_none
service._try_handle_active_review_follow_up = fake_none
service._try_handle_current_rental_info_request = fake_none
service._try_handle_immediate_context_reset = fake_none
service._try_resolve_pending_order_selection = fake_none
service._try_continue_queued_order = fake_none
service._try_handle_deterministic_rental_management = fake_none
service._try_execute_orchestration_control_tool = fake_none
service._try_handle_trade_in_evaluation = fake_none
service._try_execute_business_tool_from_turn_decision = fake_none
service._try_handle_review_management = fake_none
service._try_confirm_pending_review = fake_none
service._try_collect_and_schedule_review = fake_none
service._try_collect_and_cancel_order = fake_none
service._try_handle_order_listing = fake_none
service._try_collect_and_open_rental = fake_none
service._extract_missing_sales_search_context_with_llm = fake_none
service._prepare_message_for_single_order = lambda message, user_id, routing_plan=None: (message, None, None)
service._resolve_entities_for_message_plan = lambda message_plan, routed_message: service.normalizer.empty_extraction_payload()
return service
async def test_handle_message_keeps_message_plan_but_skips_entity_extraction_when_turn_decision_is_enough(self):
service = self._build_service()
planner_calls = []
async def fake_extract_turn_decision(message: str, user_id: int | None):
return {
"intent": "order_create",
"domain": "sales",
"action": "ask_missing_fields",
"entities": {
"generic_memory": {"orcamento_max": 70000},
"review_fields": {},
"review_management_fields": {},
"order_fields": {},
"cancel_order_fields": {},
},
"missing_fields": ["modelo_veiculo"],
"selection_index": None,
"tool_name": None,
"tool_arguments": {},
"response_to_user": None,
}
async def fake_extract_message_plan(message: str, user_id: int | None):
planner_calls.append((message, user_id))
return {
"orders": [
{
"domain": "sales",
"message": message,
"entities": service.normalizer.empty_extraction_payload(),
}
]
}
async def should_not_run_entities(message: str, user_id: int | None):
raise AssertionError("extracao dedicada nao deveria rodar quando a decisao ja trouxe entidades")
async def fake_try_collect_and_create_order(**kwargs):
return "Fluxo de venda continuado."
service._extract_turn_decision_with_llm = fake_extract_turn_decision
service._extract_message_plan_with_llm = fake_extract_message_plan
service._extract_entities_with_llm = should_not_run_entities
service._try_collect_and_create_order = fake_try_collect_and_create_order
response = await service.handle_message("quero comprar um carro ate 70 mil", user_id=1)
self.assertEqual(len(planner_calls), 1)
self.assertEqual(response, "Fluxo de venda continuado.")
async def test_handle_message_runs_entity_extraction_when_turn_decision_entities_are_empty(self):
service = self._build_service()
planner_calls = []
entity_calls = []
async def fake_extract_turn_decision(message: str, user_id: int | None):
return {
"intent": "order_create",
"domain": "sales",
"action": "ask_missing_fields",
"entities": service.normalizer.empty_extraction_payload(),
"missing_fields": ["modelo_veiculo"],
"selection_index": None,
"tool_name": None,
"tool_arguments": {},
"response_to_user": None,
}
async def fake_extract_message_plan(message: str, user_id: int | None):
planner_calls.append((message, user_id))
return {
"orders": [
{
"domain": "sales",
"message": message,
"entities": service.normalizer.empty_extraction_payload(),
}
]
}
async def fake_extract_entities(message: str, user_id: int | None):
entity_calls.append((message, user_id))
return {
"generic_memory": {"orcamento_max": 70000},
"review_fields": {},
"review_management_fields": {},
"order_fields": {},
"cancel_order_fields": {},
"intents": {},
}
async def fake_try_collect_and_create_order(**kwargs):
return "Fluxo de venda continuado."
service._extract_turn_decision_with_llm = fake_extract_turn_decision
service._extract_message_plan_with_llm = fake_extract_message_plan
service._extract_entities_with_llm = fake_extract_entities
service._try_collect_and_create_order = fake_try_collect_and_create_order
response = await service.handle_message("quero comprar um carro ate 70 mil", user_id=1)
self.assertEqual(len(planner_calls), 1)
self.assertEqual(len(entity_calls), 1)
self.assertEqual(response, "Fluxo de venda continuado.")
class OrquestradorEmailCaptureTests(unittest.IsolatedAsyncioTestCase): class OrquestradorEmailCaptureTests(unittest.IsolatedAsyncioTestCase):
def _build_service(self, state=None): def _build_service(self, state=None):

Loading…
Cancel
Save