diff --git a/app/integrations/telegram_satellite_service.py b/app/integrations/telegram_satellite_service.py index 8defeb9..b753f95 100644 --- a/app/integrations/telegram_satellite_service.py +++ b/app/integrations/telegram_satellite_service.py @@ -3,6 +3,7 @@ import logging import os import tempfile from datetime import timedelta +from time import perf_counter from typing import Any, Dict, List import aiohttp @@ -153,6 +154,12 @@ class TelegramSatelliteService: self._chat_workers_lock = asyncio.Lock() self._chat_processing_semaphore = asyncio.Semaphore(TELEGRAM_MAX_CONCURRENT_CHATS) + def _log_telegram_event(self, event: str, **payload) -> None: + logger.info("telegram_event=%s payload=%s", event, mask_sensitive_payload(payload)) + + def _elapsed_ms(self, started_at: float) -> float: + return round((perf_counter() - started_at) * 1000, 2) + async def run(self) -> None: """Inicia loop de long polling para consumir atualizacoes do bot.""" logger.info("Telegram satellite iniciado com long polling.") @@ -292,9 +299,19 @@ class TelegramSatelliteService: if queue is None: queue = asyncio.Queue() self._chat_queues[chat_id] = queue + update["_orq_enqueued_at_perf"] = perf_counter() queue.put_nowait(update) + queue_size = queue.qsize() worker = self._chat_workers.get(chat_id) + worker_active = worker is not None and not worker.done() + self._log_telegram_event( + "chat_update_enqueued", + chat_id=chat_id, + update_id=update.get("update_id"), + queue_size=queue_size, + worker_active=worker_active, + ) if worker is None or worker.done(): self._chat_workers[chat_id] = asyncio.create_task( self._run_chat_worker( @@ -315,6 +332,19 @@ class TelegramSatelliteService: try: while True: update = await queue.get() + queued_at_perf = update.get("_orq_enqueued_at_perf") + queue_wait_ms = ( + round((perf_counter() - queued_at_perf) * 1000, 2) + if isinstance(queued_at_perf, (int, float)) + else None + ) + self._log_telegram_event( + "chat_update_dequeued", + chat_id=chat_id, + update_id=update.get("update_id"), + queue_wait_ms=queue_wait_ms, + queue_size=queue.qsize(), + ) try: async with self._chat_processing_semaphore: await self._handle_update(session=session, update=update) @@ -410,12 +440,28 @@ class TelegramSatelliteService: if offset is not None: payload["offset"] = offset + started_at = perf_counter() async with session.post(f"{self.base_url}/getUpdates", json=payload) as response: data = await response.json() if not data.get("ok"): + self._log_telegram_event( + "get_updates_failed", + offset=offset, + elapsed_ms=self._elapsed_ms(started_at), + response=data, + ) logger.warning("Falha em getUpdates: %s", data) return [] - return data.get("result", []) + updates = data.get("result", []) + + if updates: + self._log_telegram_event( + "get_updates_completed", + offset=offset, + updates_count=len(updates), + elapsed_ms=self._elapsed_ms(started_at), + ) + return updates async def _handle_update( self, @@ -423,11 +469,13 @@ class TelegramSatelliteService: update: Dict[str, Any], ) -> None: """Processa uma atualizacao recebida e envia resposta ao chat.""" + started_at = perf_counter() message = update.get("message", {}) text = message.get("text") or message.get("caption") chat = message.get("chat", {}) chat_id = chat.get("id") sender = message.get("from", {}) + update_id = update.get("update_id") if not chat_id: return @@ -441,11 +489,29 @@ class TelegramSatelliteService: self._build_update_idempotency_key(update), ) await self._deliver_message(session=session, chat_id=chat_id, text=cached_answer) + self._log_telegram_event( + "update_completed", + update_id=update_id, + chat_id=chat_id, + cached_hit=True, + elapsed_ms=self._elapsed_ms(started_at), + input_chars=len(str(text or "")), + answer_chars=len(cached_answer), + image_count=0, + ) return image_attachments = await self._extract_image_attachments(session=session, message=message) + image_count = len(image_attachments) if not text and not image_attachments: + self._log_telegram_event( + "update_ignored", + update_id=update_id, + chat_id=chat_id, + reason="empty_text_and_no_image", + elapsed_ms=self._elapsed_ms(started_at), + ) return try: @@ -463,10 +529,19 @@ class TelegramSatelliteService: answer = "Nao consegui processar sua solicitacao agora. Tente novamente em instantes." self._store_processed_update(update=update, answer=answer) - update_id = update.get("update_id") if isinstance(update_id, int): self._persist_last_processed_update_id(update_id) await self._deliver_message(session=session, chat_id=chat_id, text=answer) + self._log_telegram_event( + "update_completed", + update_id=update_id, + chat_id=chat_id, + cached_hit=False, + elapsed_ms=self._elapsed_ms(started_at), + input_chars=len(str(text or "")), + answer_chars=len(str(answer or "")), + image_count=image_count, + ) async def _deliver_message( self, @@ -488,17 +563,24 @@ class TelegramSatelliteService: text: str, ) -> None: """Envia mensagem de texto para o chat informado no Telegram.""" - for chunk_index, chunk in enumerate(_split_telegram_text(text), start=1): + chunks = _split_telegram_text(text) + started_at = perf_counter() + total_attempts = 0 + successful_chunks = 0 + for chunk_index, chunk in enumerate(chunks, start=1): payload = { "chat_id": chat_id, "text": chunk, } for attempt in range(1, TELEGRAM_SEND_MESSAGE_MAX_ATTEMPTS + 1): + total_attempts += 1 try: async with session.post(f"{self.base_url}/sendMessage", json=payload) as response: data = await response.json() if not data.get("ok"): logger.warning("Falha em sendMessage: %s", data) + else: + successful_chunks += 1 break except (aiohttp.ClientError, asyncio.TimeoutError, OSError) as exc: if attempt >= TELEGRAM_SEND_MESSAGE_MAX_ATTEMPTS: @@ -522,6 +604,16 @@ class TelegramSatelliteService: ) await asyncio.sleep(delay_seconds) + self._log_telegram_event( + "send_message_completed", + chat_id=chat_id, + chunk_count=len(chunks), + successful_chunks=successful_chunks, + total_attempts=total_attempts, + elapsed_ms=self._elapsed_ms(started_at), + text_chars=len(str(text or "")), + ) + # Processa uma mensagem do Telegram e injeta o texto extraido de imagens quando houver. async def _process_message( self, @@ -531,22 +623,49 @@ class TelegramSatelliteService: image_attachments: List[Dict[str, Any]] | None = None, ) -> str: """Encaminha mensagem ao orquestrador com usuario identificado e retorna resposta.""" + started_at = perf_counter() message_text = text + image_processing_ms = None if image_attachments: + image_started_at = perf_counter() image_message = await self._build_orchestration_message_from_image( caption=text, image_attachments=image_attachments, ) + image_processing_ms = self._elapsed_ms(image_started_at) if self._is_image_analysis_failure_message(image_message): + self._log_telegram_event( + "process_message_completed", + chat_id=chat_id, + elapsed_ms=self._elapsed_ms(started_at), + image_processing_ms=image_processing_ms, + orchestration_offloaded=False, + used_image_attachments=True, + input_chars=len(str(text or "")), + response_chars=len(image_message), + ) return image_message message_text = image_message - return await asyncio.to_thread( + orchestration_started_at = perf_counter() + answer = await asyncio.to_thread( self._run_blocking_orchestration_turn, message_text=message_text, sender=sender, chat_id=chat_id, ) + self._log_telegram_event( + "process_message_completed", + chat_id=chat_id, + elapsed_ms=self._elapsed_ms(started_at), + image_processing_ms=image_processing_ms, + orchestration_ms=self._elapsed_ms(orchestration_started_at), + orchestration_offloaded=True, + used_image_attachments=bool(image_attachments), + input_chars=len(message_text), + response_chars=len(str(answer or "")), + ) + return answer def _run_blocking_orchestration_turn( self, @@ -559,9 +678,11 @@ class TelegramSatelliteService: Executa o turno do orquestrador fora do loop async principal. Isso isola sessoes SQLAlchemy sincronas e outras operacoes bloqueantes. """ + started_at = perf_counter() tools_db = SessionLocal() mock_db = SessionMockLocal() try: + user_resolution_started_at = perf_counter() user_service = UserService(mock_db) external_id = str(sender.get("id") or chat_id) first_name = (sender.get("first_name") or "").strip() @@ -575,12 +696,30 @@ class TelegramSatelliteService: name=display_name, username=username, ) + user_resolution_ms = self._elapsed_ms(user_resolution_started_at) + service_init_started_at = perf_counter() service = OrquestradorService( tools_db, state_repository=self.state, ) - return asyncio.run(service.handle_message(message=message_text, user_id=user.id)) + service_init_ms = self._elapsed_ms(service_init_started_at) + + orchestration_started_at = perf_counter() + response = asyncio.run(service.handle_message(message=message_text, user_id=user.id)) + orchestration_ms = self._elapsed_ms(orchestration_started_at) + self._log_telegram_event( + "blocking_turn_completed", + chat_id=chat_id, + user_id=user.id, + elapsed_ms=self._elapsed_ms(started_at), + user_resolution_ms=user_resolution_ms, + service_init_ms=service_init_ms, + orchestration_ms=orchestration_ms, + input_chars=len(message_text), + response_chars=len(str(response or "")), + ) + return response finally: tools_db.close() mock_db.close() diff --git a/app/services/ai/llm_service.py b/app/services/ai/llm_service.py index 8581251..37c5f69 100644 --- a/app/services/ai/llm_service.py +++ b/app/services/ai/llm_service.py @@ -1,5 +1,7 @@ import asyncio import json +import logging +from time import perf_counter from typing import Dict, Any, List, Optional import vertexai @@ -9,6 +11,8 @@ from vertexai.generative_models import FunctionDeclaration, GenerativeModel, Par from app.core.settings import settings from app.models.tool_model import ToolDefinition +logger = logging.getLogger(__name__) + IMAGE_ANALYSIS_FAILURE_MESSAGE = "Nao consegui identificar os dados da imagem. Descreva o documento ou envie uma foto mais nitida." INVALID_RECEIPT_WATERMARK_MESSAGE = "O comprovante enviado nao e valido. Envie um comprovante valido com a marca d'agua SysaltiIA visivel." VALID_RECEIPT_WATERMARK_MARKER = "[watermark_sysaltiia_ok]" @@ -37,6 +41,9 @@ class LLMService: fallback_models = ["gemini-2.5-pro", "gemini-2.5-flash", "gemini-2.0-flash-001"] self.model_names = [configured] + [m for m in fallback_models if m != configured] + def _log_llm_event(self, event: str, **payload) -> None: + logger.info("llm_service_event=%s payload=%s", event, payload) + # Transforma anexos de imagem em uma mensagem textual pronta para o orquestrador. async def extract_image_workflow_message( self, @@ -63,10 +70,15 @@ class LLMService: response = None last_error = None + selected_model_name = None + attempts = 0 + started_at = perf_counter() for model_name in self.model_names: + attempts += 1 try: model = self._get_model(model_name) response = await asyncio.to_thread(model.generate_content, contents) + selected_model_name = model_name break except NotFound as err: last_error = err @@ -74,6 +86,13 @@ class LLMService: continue if response is None: + self._log_llm_event( + "image_workflow_failed", + elapsed_ms=round((perf_counter() - started_at) * 1000, 2), + attempts=attempts, + attachments_count=len(attachments), + caption_present=bool(str(caption or "").strip()), + ) if last_error: raise RuntimeError( f"Nenhum modelo Vertex disponivel para analise de imagem. Erro: {last_error}" @@ -81,6 +100,14 @@ class LLMService: raise RuntimeError("Falha ao analisar imagem no Vertex AI.") payload = self._extract_response_payload(response) + self._log_llm_event( + "image_workflow_completed", + model_name=selected_model_name, + elapsed_ms=round((perf_counter() - started_at) * 1000, 2), + attempts=attempts, + attachments_count=len(attachments), + caption_present=bool(str(caption or "").strip()), + ) extracted_text = (payload.get("response") or "").strip() or (caption or "").strip() return self._coerce_image_workflow_response(extracted_text) @@ -214,15 +241,20 @@ class LLMService: response = None last_error = None + selected_model_name = None + attempts = 0 + started_at = perf_counter() # Tenta o modelo configurado e cai para nomes alternativos # quando o principal nao estiver disponivel no projeto/regiao. for model_name in self.model_names: + attempts += 1 try: model = self._get_model(model_name) chat = model.start_chat(history=history or []) send_kwargs = {"tools": vertex_tools} if vertex_tools else {} response = await asyncio.to_thread(chat.send_message, message, **send_kwargs) + selected_model_name = model_name break except NotFound as err: last_error = err @@ -230,13 +262,30 @@ class LLMService: continue if response is None: + self._log_llm_event( + "generate_response_failed", + elapsed_ms=round((perf_counter() - started_at) * 1000, 2), + attempts=attempts, + tools_count=len(tools or []), + history_count=len(history or []), + ) if last_error: raise RuntimeError( f"Nenhum modelo Vertex disponivel. Verifique VERTEX_MODEL_NAME e acesso no projeto. Erro: {last_error}" ) from last_error raise RuntimeError("Falha ao gerar resposta no Vertex AI.") - return self._extract_response_payload(response) + payload = self._extract_response_payload(response) + self._log_llm_event( + "generate_response_completed", + model_name=selected_model_name, + elapsed_ms=round((perf_counter() - started_at) * 1000, 2), + attempts=attempts, + tools_count=len(tools or []), + history_count=len(history or []), + tool_call=bool(payload.get("tool_call")), + ) + return payload async def warmup(self) -> None: """Preaquece conexao/modelo para reduzir latencia da primeira requisicao real.""" diff --git a/app/services/orchestration/orquestrador_service.py b/app/services/orchestration/orquestrador_service.py index 9eeb61f..5da494b 100644 --- a/app/services/orchestration/orquestrador_service.py +++ b/app/services/orchestration/orquestrador_service.py @@ -299,18 +299,33 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin, RentalFlowMixin): user_id=user_id, ) self._capture_turn_decision_trace(turn_decision) - llm_extracted_entities = await self._extract_entities_with_llm( - message=routing_message, - user_id=user_id, - ) - extracted_entities = self._merge_extracted_entities( - extracted_entities, - llm_extracted_entities, - ) - if self._has_useful_turn_decision(turn_decision): + decision_entities = self._extracted_entities_from_turn_decision(turn_decision) + if self._has_useful_extraction(decision_entities): extracted_entities = self._merge_extracted_entities( extracted_entities, - self._extracted_entities_from_turn_decision(turn_decision), + decision_entities, + ) + if not self._has_useful_extraction(extracted_entities): + llm_extracted_entities = await self._extract_entities_with_llm( + message=routing_message, + user_id=user_id, + ) + extracted_entities = self._merge_extracted_entities( + extracted_entities, + llm_extracted_entities, + ) + else: + started_at = perf_counter() + self._emit_turn_stage_metric( + "extract_entities_short_circuit", + started_at, + has_message_plan_entities=self._has_useful_extraction( + self._resolve_entities_for_message_plan( + message_plan=message_plan, + routed_message=routing_message, + ) + ), + has_turn_decision_entities=self._has_useful_extraction(decision_entities), ) self._capture_generic_memory( user_id=user_id, @@ -1686,7 +1701,9 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin, RentalFlowMixin): # Nessa funcao eu configuro a memoria volatil do sistema def _upsert_user_context(self, user_id: int | None) -> None: + started_at = perf_counter() self._context_manager.upsert_user_context(user_id=user_id) + self._emit_turn_stage_metric("upsert_user_context", started_at) def _get_user_context(self, user_id: int | None) -> dict | None: return self._context_manager.get_user_context(user_id) @@ -1738,6 +1755,7 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin, RentalFlowMixin): user = self._get_user_record(user_id=user_id) if not user or not getattr(user, "email", None): return + started_at = perf_counter() try: sync_user_email_integration_routes( user_id=user.id, @@ -1745,6 +1763,11 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin, RentalFlowMixin): recipient_name=user.name, ) self._user_profile_routes_ready = True + self._emit_turn_stage_metric( + "ensure_user_email_routes", + started_at, + synced_routes_count=6, + ) except Exception: logger.exception( "Falha ao sincronizar rotas de email do usuario.", @@ -1953,16 +1976,50 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin, RentalFlowMixin): return self.normalizer.coerce_extraction_contract(payload) async def _extract_message_plan_with_llm(self, message: str, user_id: int | None) -> dict: - return await self.planner.extract_message_plan(message=message, user_id=user_id) + started_at = perf_counter() + result = await self.planner.extract_message_plan(message=message, user_id=user_id) + self._emit_turn_stage_metric( + "extract_message_plan", + started_at, + order_count=len(result.get("orders") or []) if isinstance(result, dict) else 0, + ) + return result async def _extract_routing_with_llm(self, message: str, user_id: int | None) -> dict: - return await self.planner.extract_routing(message=message, user_id=user_id) + started_at = perf_counter() + result = await self.planner.extract_routing(message=message, user_id=user_id) + self._emit_turn_stage_metric( + "extract_routing", + started_at, + order_count=len(result.get("orders") or []) if isinstance(result, dict) else 0, + ) + return result async def _extract_entities_with_llm(self, message: str, user_id: int | None) -> dict: - return await self.planner.extract_entities(message=message, user_id=user_id) + started_at = perf_counter() + result = await self.planner.extract_entities(message=message, user_id=user_id) + self._emit_turn_stage_metric( + "extract_entities", + started_at, + has_generic_memory=bool((result or {}).get("generic_memory")), + review_field_keys=[ + key + for key, value in ((result or {}).get("review_fields") or {}).items() + if value not in (None, "", [], {}) + ], + ) + return result async def _extract_sales_search_context_with_llm(self, message: str, user_id: int | None) -> dict: - return await self.planner.extract_sales_search_context(message=message, user_id=user_id) + started_at = perf_counter() + result = await self.planner.extract_sales_search_context(message=message, user_id=user_id) + self._emit_turn_stage_metric( + "extract_sales_search_context", + started_at, + has_budget=bool((result or {}).get("orcamento_max")), + profile_count=len((result or {}).get("perfil_veiculo") or []), + ) + return result async def _extract_missing_sales_search_context_with_llm( self, @@ -1985,7 +2042,16 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin, RentalFlowMixin): return await self._extract_sales_search_context_with_llm(message=message, user_id=user_id) async def _extract_turn_decision_with_llm(self, message: str, user_id: int | None) -> dict: - return await self.planner.extract_turn_decision(message=message, user_id=user_id) + started_at = perf_counter() + result = await self.planner.extract_turn_decision(message=message, user_id=user_id) + self._emit_turn_stage_metric( + "extract_turn_decision", + started_at, + intent=str((result or {}).get("intent") or ""), + action=str((result or {}).get("action") or ""), + domain=str((result or {}).get("domain") or ""), + ) + return result async def _try_handle_immediate_context_reset( self, @@ -2773,6 +2839,14 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin, RentalFlowMixin): def _format_turn_error(self, exc: Exception) -> str: return self._execution_manager.format_turn_error(exc) + def _emit_turn_stage_metric(self, operation: str, started_at: float, **payload) -> None: + self._log_turn_event( + "turn_stage_completed", + operation=operation, + elapsed_ms=round((perf_counter() - started_at) * 1000, 2), + **payload, + ) + def _log_turn_event(self, event: str, **payload) -> None: self._execution_manager.log_turn_event(event, **payload) diff --git a/tests/test_turn_decision_contract.py b/tests/test_turn_decision_contract.py index 6b654f2..d3325ee 100644 --- a/tests/test_turn_decision_contract.py +++ b/tests/test_turn_decision_contract.py @@ -4788,6 +4788,180 @@ class TurnDecisionContractTests(unittest.IsolatedAsyncioTestCase): +class OrquestradorLatencyOptimizationTests(unittest.IsolatedAsyncioTestCase): + def _build_service(self, state=None): + default_state = state or FakeState( + contexts={ + 1: { + "active_domain": "general", + "generic_memory": {}, + "shared_memory": {}, + "order_queue": [], + "pending_order_selection": None, + "pending_switch": None, + "last_stock_results": [], + "selected_vehicle": None, + } + } + ) + service = OrquestradorService.__new__(OrquestradorService) + service.state = default_state + service.normalizer = EntityNormalizer() + service.history_service = SimpleNamespace(record_turn=lambda **kwargs: None) + service._get_user_context = lambda user_id: default_state.get_user_context(user_id) + service._save_user_context = lambda user_id, context: default_state.save_user_context(user_id, context) + service._log_turn_event = lambda *args, **kwargs: None + service._compose_order_aware_response = lambda response, user_id, queue_notice=None: response + service._append_email_capture_prompt_if_needed = lambda response, user_id: response + service._finalize_turn_history = lambda **kwargs: None + service._upsert_user_context = lambda user_id: None + service._ensure_user_email_routes = lambda user_id: None + service._capture_turn_decision_trace = lambda turn_decision: None + service._capture_generic_memory = lambda **kwargs: None + service._domain_from_intents = lambda intents: "general" + service._handle_context_switch = lambda **kwargs: None + service._update_active_domain = lambda **kwargs: None + + async def fake_maybe_auto_advance_next_order(base_response: str, user_id: int | None): + return base_response + + async def fake_none(**kwargs): + return None + + service._maybe_auto_advance_next_order = fake_maybe_auto_advance_next_order + service._try_handle_pending_email_capture_message = fake_none + service._try_handle_pending_stock_selection_follow_up = fake_none + service._try_handle_active_sales_follow_up = fake_none + service._try_handle_pending_rental_selection_follow_up = fake_none + service._try_handle_active_rental_follow_up = fake_none + service._try_handle_active_review_follow_up = fake_none + service._try_handle_current_rental_info_request = fake_none + service._try_handle_immediate_context_reset = fake_none + service._try_resolve_pending_order_selection = fake_none + service._try_continue_queued_order = fake_none + service._try_handle_deterministic_rental_management = fake_none + service._try_execute_orchestration_control_tool = fake_none + service._try_handle_trade_in_evaluation = fake_none + service._try_execute_business_tool_from_turn_decision = fake_none + service._try_handle_review_management = fake_none + service._try_confirm_pending_review = fake_none + service._try_collect_and_schedule_review = fake_none + service._try_collect_and_cancel_order = fake_none + service._try_handle_order_listing = fake_none + service._try_collect_and_open_rental = fake_none + service._extract_missing_sales_search_context_with_llm = fake_none + service._prepare_message_for_single_order = lambda message, user_id, routing_plan=None: (message, None, None) + service._resolve_entities_for_message_plan = lambda message_plan, routed_message: service.normalizer.empty_extraction_payload() + return service + + async def test_handle_message_keeps_message_plan_but_skips_entity_extraction_when_turn_decision_is_enough(self): + service = self._build_service() + planner_calls = [] + + async def fake_extract_turn_decision(message: str, user_id: int | None): + return { + "intent": "order_create", + "domain": "sales", + "action": "ask_missing_fields", + "entities": { + "generic_memory": {"orcamento_max": 70000}, + "review_fields": {}, + "review_management_fields": {}, + "order_fields": {}, + "cancel_order_fields": {}, + }, + "missing_fields": ["modelo_veiculo"], + "selection_index": None, + "tool_name": None, + "tool_arguments": {}, + "response_to_user": None, + } + + async def fake_extract_message_plan(message: str, user_id: int | None): + planner_calls.append((message, user_id)) + return { + "orders": [ + { + "domain": "sales", + "message": message, + "entities": service.normalizer.empty_extraction_payload(), + } + ] + } + + async def should_not_run_entities(message: str, user_id: int | None): + raise AssertionError("extracao dedicada nao deveria rodar quando a decisao ja trouxe entidades") + + async def fake_try_collect_and_create_order(**kwargs): + return "Fluxo de venda continuado." + + service._extract_turn_decision_with_llm = fake_extract_turn_decision + service._extract_message_plan_with_llm = fake_extract_message_plan + service._extract_entities_with_llm = should_not_run_entities + service._try_collect_and_create_order = fake_try_collect_and_create_order + + response = await service.handle_message("quero comprar um carro ate 70 mil", user_id=1) + + self.assertEqual(len(planner_calls), 1) + self.assertEqual(response, "Fluxo de venda continuado.") + + async def test_handle_message_runs_entity_extraction_when_turn_decision_entities_are_empty(self): + service = self._build_service() + planner_calls = [] + entity_calls = [] + + async def fake_extract_turn_decision(message: str, user_id: int | None): + return { + "intent": "order_create", + "domain": "sales", + "action": "ask_missing_fields", + "entities": service.normalizer.empty_extraction_payload(), + "missing_fields": ["modelo_veiculo"], + "selection_index": None, + "tool_name": None, + "tool_arguments": {}, + "response_to_user": None, + } + + async def fake_extract_message_plan(message: str, user_id: int | None): + planner_calls.append((message, user_id)) + return { + "orders": [ + { + "domain": "sales", + "message": message, + "entities": service.normalizer.empty_extraction_payload(), + } + ] + } + + async def fake_extract_entities(message: str, user_id: int | None): + entity_calls.append((message, user_id)) + return { + "generic_memory": {"orcamento_max": 70000}, + "review_fields": {}, + "review_management_fields": {}, + "order_fields": {}, + "cancel_order_fields": {}, + "intents": {}, + } + + async def fake_try_collect_and_create_order(**kwargs): + return "Fluxo de venda continuado." + + service._extract_turn_decision_with_llm = fake_extract_turn_decision + service._extract_message_plan_with_llm = fake_extract_message_plan + service._extract_entities_with_llm = fake_extract_entities + service._try_collect_and_create_order = fake_try_collect_and_create_order + + response = await service.handle_message("quero comprar um carro ate 70 mil", user_id=1) + + self.assertEqual(len(planner_calls), 1) + self.assertEqual(len(entity_calls), 1) + self.assertEqual(response, "Fluxo de venda continuado.") + + + class OrquestradorEmailCaptureTests(unittest.IsolatedAsyncioTestCase): def _build_service(self, state=None):