diff --git a/app/integrations/telegram_satellite_service.py b/app/integrations/telegram_satellite_service.py index d23f068..8defeb9 100644 --- a/app/integrations/telegram_satellite_service.py +++ b/app/integrations/telegram_satellite_service.py @@ -32,6 +32,8 @@ TELEGRAM_IDEMPOTENCY_CACHE_LIMIT = 100 TELEGRAM_RUNTIME_BUCKET = "telegram_runtime_state" TELEGRAM_RUNTIME_OWNER_ID = 0 TELEGRAM_RUNTIME_CURSOR_TTL_DAYS = 30 +TELEGRAM_SEND_MESSAGE_MAX_ATTEMPTS = 3 +TELEGRAM_SEND_MESSAGE_RETRY_BASE_SECONDS = 1.0 def _split_telegram_text(text: str, limit: int = TELEGRAM_MESSAGE_SAFE_LIMIT) -> List[str]: @@ -438,7 +440,7 @@ class TelegramSatelliteService: chat_id, self._build_update_idempotency_key(update), ) - await self._send_message(session=session, chat_id=chat_id, text=cached_answer) + await self._deliver_message(session=session, chat_id=chat_id, text=cached_answer) return image_attachments = await self._extract_image_attachments(session=session, message=message) @@ -464,7 +466,20 @@ class TelegramSatelliteService: update_id = update.get("update_id") if isinstance(update_id, int): self._persist_last_processed_update_id(update_id) - await self._send_message(session=session, chat_id=chat_id, text=answer) + await self._deliver_message(session=session, chat_id=chat_id, text=answer) + + async def _deliver_message( + self, + *, + session: aiohttp.ClientSession, + chat_id: int, + text: str, + ) -> None: + """Entrega a resposta ao Telegram sem deixar falhas de transporte derrubarem o worker.""" + try: + await self._send_message(session=session, chat_id=chat_id, text=text) + except Exception: + logger.exception("Falha inesperada ao entregar mensagem ao Telegram. chat_id=%s", chat_id) async def _send_message( self, @@ -473,15 +488,39 @@ class TelegramSatelliteService: text: str, ) -> None: """Envia mensagem de texto para o chat informado no Telegram.""" - for chunk in _split_telegram_text(text): + for chunk_index, chunk in enumerate(_split_telegram_text(text), start=1): payload = { "chat_id": chat_id, "text": chunk, } - async with session.post(f"{self.base_url}/sendMessage", json=payload) as response: - data = await response.json() - if not data.get("ok"): - logger.warning("Falha em sendMessage: %s", data) + for attempt in range(1, TELEGRAM_SEND_MESSAGE_MAX_ATTEMPTS + 1): + try: + async with session.post(f"{self.base_url}/sendMessage", json=payload) as response: + data = await response.json() + if not data.get("ok"): + logger.warning("Falha em sendMessage: %s", data) + break + except (aiohttp.ClientError, asyncio.TimeoutError, OSError) as exc: + if attempt >= TELEGRAM_SEND_MESSAGE_MAX_ATTEMPTS: + logger.warning( + "Falha de transporte ao enviar mensagem ao Telegram apos %s tentativa(s). chat_id=%s chunk=%s erro=%s", + TELEGRAM_SEND_MESSAGE_MAX_ATTEMPTS, + chat_id, + chunk_index, + exc, + ) + break + delay_seconds = TELEGRAM_SEND_MESSAGE_RETRY_BASE_SECONDS * attempt + logger.warning( + "Falha temporaria ao enviar mensagem ao Telegram. chat_id=%s chunk=%s tentativa=%s/%s retry_em=%.1fs erro=%s", + chat_id, + chunk_index, + attempt, + TELEGRAM_SEND_MESSAGE_MAX_ATTEMPTS, + delay_seconds, + exc, + ) + await asyncio.sleep(delay_seconds) # Processa uma mensagem do Telegram e injeta o texto extraido de imagens quando houver. async def _process_message( diff --git a/app/services/flows/rental_flow_support.py b/app/services/flows/rental_flow_support.py index 2b924e0..8411fb7 100644 --- a/app/services/flows/rental_flow_support.py +++ b/app/services/flows/rental_flow_support.py @@ -2,7 +2,11 @@ from __future__ import annotations from datetime import timedelta +from sqlalchemy import or_ + from app.core.time_utils import utc_now +from app.db.mock_database import SessionMockLocal +from app.db.mock_models import RentalContract, RentalFine, RentalPayment from app.services.flows.flow_state_support import FlowStateSupport from app.services.orchestration import technical_normalizer from app.services.orchestration.orchestrator_config import PENDING_RENTAL_SELECTION_TTL_MINUTES @@ -11,6 +15,94 @@ from app.services.orchestration.orchestrator_config import PENDING_RENTAL_SELECT class RentalFlowStateSupport(FlowStateSupport): """Concentra estado e contexto incremental do fluxo de locacao.""" + def _load_last_rental_contract_snapshot(self, user_id: int | None) -> dict | None: + if user_id is None: + return None + db = None + try: + db = SessionMockLocal() + base_query = db.query(RentalContract).filter(RentalContract.user_id == user_id) + contract = ( + base_query.filter(RentalContract.status == "ativa") + .order_by(RentalContract.created_at.desc()) + .first() + ) + if contract is None: + contract = base_query.order_by(RentalContract.created_at.desc()).first() + if contract is None: + return None + + payload = { + "contrato_numero": contract.contrato_numero, + "placa": contract.placa, + "modelo_veiculo": contract.modelo_veiculo, + "categoria": contract.categoria, + "data_inicio": contract.data_inicio.isoformat() if contract.data_inicio else None, + "data_fim_prevista": contract.data_fim_prevista.isoformat() if contract.data_fim_prevista else None, + "data_devolucao": contract.data_devolucao.isoformat() if contract.data_devolucao else None, + "valor_diaria": contract.valor_diaria, + "valor_previsto": contract.valor_previsto, + "valor_final": contract.valor_final, + "status": contract.status, + } + + latest_payment = ( + db.query(RentalPayment) + .filter( + or_( + RentalPayment.rental_contract_id == contract.id, + RentalPayment.contrato_numero == contract.contrato_numero, + ) + ) + .order_by(RentalPayment.created_at.desc()) + .first() + ) + if latest_payment is not None: + payload.update( + { + "valor": latest_payment.valor, + "data_pagamento": latest_payment.data_pagamento.isoformat() + if latest_payment.data_pagamento + else None, + "favorecido": latest_payment.favorecido, + "status": "registrado", + } + ) + + latest_fine = ( + db.query(RentalFine) + .filter( + or_( + RentalFine.rental_contract_id == contract.id, + RentalFine.contrato_numero == contract.contrato_numero, + ) + ) + .order_by(RentalFine.created_at.desc()) + .first() + ) + if latest_fine is not None: + payload.update( + { + "auto_infracao": latest_fine.auto_infracao, + "data_infracao": latest_fine.data_infracao.isoformat() + if latest_fine.data_infracao + else None, + "vencimento": latest_fine.vencimento.isoformat() if latest_fine.vencimento else None, + } + ) + if latest_fine.valor is not None: + payload["valor_multa"] = float(latest_fine.valor) + + return self.sanitize_rental_contract_snapshot(payload) + except Exception: + return None + finally: + if db is not None: + try: + db.close() + except Exception: + pass + def sanitize_rental_results(self, rental_results: list[dict] | None) -> list[dict]: sanitized: list[dict] = [] for item in rental_results or []: @@ -104,7 +196,6 @@ class RentalFlowStateSupport(FlowStateSupport): for field_name in ( "modelo_veiculo", "categoria", - "status", "status_veiculo", "data_inicio", "data_fim_prevista", @@ -114,19 +205,57 @@ class RentalFlowStateSupport(FlowStateSupport): if value: snapshot[field_name] = value + status_value = str(payload.get("status") or "").strip() + if status_value: + if payload.get("data_pagamento"): + snapshot["status_pagamento"] = status_value + else: + snapshot["status"] = status_value + for field_name in ("valor_diaria", "valor_previsto", "valor_final"): number = technical_normalizer.normalize_positive_number(payload.get(field_name)) if number is not None: snapshot[field_name] = float(number) + payment_date = str(payload.get("data_pagamento") or "").strip() + if payment_date: + snapshot["data_pagamento"] = payment_date + payment_value = technical_normalizer.normalize_positive_number(payload.get("valor")) + if payment_value is not None: + snapshot["valor_pagamento"] = float(payment_value) + favorecido = str(payload.get("favorecido") or "").strip() + if favorecido: + snapshot["favorecido"] = favorecido + snapshot.setdefault("status_pagamento", "registrado") + + violation_date = str(payload.get("data_infracao") or "").strip() + if violation_date: + snapshot["data_infracao"] = violation_date + due_date = str(payload.get("vencimento") or "").strip() + if due_date: + snapshot["vencimento"] = due_date + infraction_notice = str(payload.get("auto_infracao") or "").strip() + if infraction_notice: + snapshot["auto_infracao"] = infraction_notice + if violation_date or infraction_notice: + fine_value = technical_normalizer.normalize_positive_number(payload.get("valor")) + if fine_value is not None: + snapshot["valor_multa"] = float(fine_value) + return snapshot def get_last_rental_contract(self, user_id: int | None) -> dict | None: context = self.service._get_user_context(user_id) - if not isinstance(context, dict): - return None - contract = context.get("last_rental_contract") - return dict(contract) if isinstance(contract, dict) else None + if isinstance(context, dict): + contract = context.get("last_rental_contract") + if isinstance(contract, dict): + return dict(contract) + + snapshot = self._load_last_rental_contract_snapshot(user_id=user_id) + if snapshot and isinstance(context, dict): + context["last_rental_contract"] = dict(snapshot) + self.service._save_user_context(user_id=user_id, context=context) + return dict(snapshot) if isinstance(snapshot, dict) else None def store_last_rental_contract(self, user_id: int | None, payload) -> None: if user_id is None: @@ -138,7 +267,14 @@ class RentalFlowStateSupport(FlowStateSupport): if sanitized is None: context.pop("last_rental_contract", None) else: - context["last_rental_contract"] = sanitized + existing = context.get("last_rental_contract") + merged = dict(existing) if isinstance(existing, dict) else {} + merged.update(sanitized) + if merged.get("data_pagamento") and not merged.get("status_pagamento"): + merged["status_pagamento"] = "registrado" + elif merged.get("contrato_numero") and not merged.get("data_devolucao") and not merged.get("status_pagamento"): + merged["status_pagamento"] = "em aberto" + context["last_rental_contract"] = merged self.service._save_user_context(user_id=user_id, context=context) def remember_rental_results(self, user_id: int | None, rental_results: list[dict] | None) -> None: diff --git a/app/services/orchestration/conversation_policy.py b/app/services/orchestration/conversation_policy.py index fc9a9c6..6f23453 100644 --- a/app/services/orchestration/conversation_policy.py +++ b/app/services/orchestration/conversation_policy.py @@ -80,6 +80,8 @@ ACTIVE_TASK_LABELS = { "rental_create": "abertura de locacao", } +ACTIONABLE_ORDER_DOMAINS = {"review", "sales", "rental"} + # essa classe é responsável por controlar qual o assunto está ativo na conversa, se existe fluxo aberto, se o usuário mandou dois pedidos ao mesmo tempo... class ConversationPolicy: def __init__(self, service: "OrquestradorService"): @@ -132,20 +134,21 @@ class ConversationPolicy: domain: str, order_message: str, memory_seed: dict | None = None, - ) -> None: + ) -> bool: context = self.service._get_user_context(user_id) if not context or domain == "general": - return + return False queue = context.setdefault("order_queue", []) queue.append( { "domain": domain, - "message": (order_message or "").strip(), + "message": self.build_order_execution_message(domain, order_message), "memory_seed": dict(memory_seed or self.service._new_tab_memory(user_id=user_id)), "created_at": utc_now().isoformat(), } ) self._save_context(user_id=user_id, context=context) + return True # Transforma as entidades extraídas de um pedido em uma memória temporária pronta para usar quando esse pedido for processado. @@ -210,7 +213,7 @@ class ConversationPolicy: if not isinstance(item, dict): continue domain = str(item.get("domain") or "general").strip().lower() - if domain not in {"review", "sales", "general"}: + if domain not in ACTIONABLE_ORDER_DOMAINS | {"general"}: domain = "general" segment = str(item.get("message") or "").strip() if segment: @@ -223,14 +226,19 @@ class ConversationPolicy: ) if not extracted_orders: extracted_orders = [{"domain": "general", "message": (message or "").strip()}] + extracted_orders = self.augment_actionable_orders_from_message( + message=message, + extracted_orders=extracted_orders, + ) + + actionable_orders = [order for order in extracted_orders if order["domain"] in ACTIONABLE_ORDER_DOMAINS] if ( - len(extracted_orders) == 2 - and all(order["domain"] != "general" for order in extracted_orders) + len(actionable_orders) >= 2 and not self.has_open_flow(user_id=user_id, domain=active_domain) ): - self.store_pending_order_selection(user_id=user_id, orders=extracted_orders) - return message, None, self.render_order_selection_prompt(extracted_orders) + self.store_pending_order_selection(user_id=user_id, orders=actionable_orders) + return message, None, self.render_order_selection_prompt(actionable_orders) if len(extracted_orders) <= 1: inferred = extracted_orders[0]["domain"] @@ -247,29 +255,33 @@ class ConversationPolicy: if self.has_open_flow(user_id=user_id, domain=active_domain): queued_count = 0 - for queued in extracted_orders: + for queued in actionable_orders: if queued["domain"] != active_domain: - self.queue_order_with_memory_seed( - user_id=user_id, - domain=queued["domain"], - order_message=queued["message"], - memory_seed=self.build_order_memory_seed(user_id=user_id, order=queued), + queued_count += int( + self.queue_order_with_memory_seed( + user_id=user_id, + domain=queued["domain"], + order_message=queued["message"], + memory_seed=self.build_order_memory_seed(user_id=user_id, order=queued), + ) ) - queued_count += 1 queue_hint = self.render_queue_notice(queued_count) prompt = self.render_open_flow_prompt(user_id=user_id, domain=active_domain) return message, None, f"{prompt}\n{queue_hint}" if queue_hint else prompt - first = extracted_orders[0] + first = actionable_orders[0] if actionable_orders else extracted_orders[0] queued_count = 0 - for queued in extracted_orders[1:]: - self.queue_order_with_memory_seed( - user_id=user_id, - domain=queued["domain"], - order_message=queued["message"], - memory_seed=self.build_order_memory_seed(user_id=user_id, order=queued), + for queued in actionable_orders: + if queued is first: + continue + queued_count += int( + self.queue_order_with_memory_seed( + user_id=user_id, + domain=queued["domain"], + order_message=queued["message"], + memory_seed=self.build_order_memory_seed(user_id=user_id, order=queued), + ) ) - queued_count += 1 context["active_domain"] = first["domain"] context["generic_memory"] = self.build_order_memory_seed(user_id=user_id, order=first) self._save_context(user_id=user_id, context=context) @@ -297,9 +309,10 @@ class ConversationPolicy: { "domain": order["domain"], "message": order["message"], + "seed_message": self.build_order_execution_message(order["domain"], order["message"]), "memory_seed": self.build_order_memory_seed(user_id=user_id, order=order), } - for order in orders[:2] + for order in orders ], "expires_at": utc_now() + timedelta(minutes=PENDING_ORDER_SELECTION_TTL_MINUTES), } @@ -310,15 +323,68 @@ class ConversationPolicy: def render_order_selection_prompt(self, orders: list[dict]) -> str: if len(orders) < 2: return "Qual das acoes voce quer iniciar primeiro?" - first_label = self.describe_order_selection_option(orders[0]) - second_label = self.describe_order_selection_option(orders[1]) + enumerated_orders = "\n".join( + f"{index}. {self.describe_order_selection_option(order)}" + for index, order in enumerate(orders, start=1) + ) return ( - "Identifiquei duas acoes na sua mensagem:\n" - f"1. {first_label}\n" - f"2. {second_label}\n" + f"Identifiquei {len(orders)} acoes na sua mensagem:\n" + f"{enumerated_orders}\n" "Qual delas voce quer iniciar primeiro? Se for indiferente, eu escolho." ) + def build_order_execution_message(self, domain: str, order_message: str | None) -> str: + raw_message = str(order_message or "").strip() + normalized = self.service.normalizer.normalize_text(raw_message).strip() + if domain == "sales" and normalized in {"compra", "comprar", "venda", "pedido"}: + return "quero comprar um veiculo" + if domain == "review" and normalized in {"revisao", "agendamento", "agendar", "marcar revisao"}: + return "quero agendar revisao" + if domain == "rental" and normalized in {"aluguel", "alugar", "locacao", "locar"}: + return "quero alugar um carro" + return raw_message + + def augment_actionable_orders_from_message(self, message: str, extracted_orders: list[dict]) -> list[dict]: + normalized = self.service.normalizer.normalize_text(message).strip() + if not normalized: + return extracted_orders + existing_domains = { + str(order.get("domain") or "general") + for order in extracted_orders + if isinstance(order, dict) + } + domain_hints = ( + ("sales", {"compra", "comprar", "venda", "pedido"}, "compra"), + ("review", {"revisao", "agendamento", "agendar", "remarcar"}, "revisao"), + ("rental", {"aluguel", "alugar", "locacao", "locar"}, "aluguel"), + ) + augmented = list(extracted_orders) + for domain, terms, label in domain_hints: + if domain in existing_domains: + continue + if any(term in normalized for term in terms): + augmented.append( + { + "domain": domain, + "message": label, + "entities": self.service.normalizer.empty_extraction_payload(), + } + ) + return augmented + + def render_multi_order_clarification_prompt(self, orders: list[dict]) -> str: + if not orders: + return "Identifiquei mais de um assunto. Me diga qual voce quer iniciar primeiro." + options = "\n".join( + f"- {self.describe_order_selection_option(order)}" + for order in orders[:3] + ) + return ( + "Identifiquei mais de um assunto na sua mensagem:\n" + f"{options}\n" + "Para eu nao misturar os fluxos, me diga qual deles voce quer comecar primeiro." + ) + # Formata o rótulo do pedido para exibição. def describe_order_selection_option(self, order: dict) -> str: @@ -429,6 +495,139 @@ class ConversationPolicy: } return self.contains_any_term(normalized, operational_terms) + def is_explicit_pending_order_selection_message( + self, + message: str, + turn_decision: dict | None = None, + ) -> bool: + if self._decision_selection_index(turn_decision) is not None: + return True + + normalized = self.strip_choice_message(self.service.normalizer.normalize_text(message)) + if not normalized: + return False + + indifferent_tokens = { + "tanto faz", + "indiferente", + "qualquer um", + "qualquer uma", + "voce escolhe", + "pode escolher", + "fica a seu criterio", + } + if normalized in indifferent_tokens: + return True + if re.fullmatch(r"(?:opcao|acao|pedido)?\s*(\d+)", normalized): + return True + + explicit_selection_messages = { + "compra", + "comprar", + "quero comprar", + "quero comprar um veiculo", + "venda", + "pedido", + "revisao", + "agendamento", + "agendar", + "agendar revisao", + "quero agendar revisao", + "aluguel", + "alugar", + "quero alugar", + "quero alugar um carro", + "locacao", + "locar", + } + return normalized in explicit_selection_messages + + def derive_operational_task_key( + self, + *, + message: str, + turn_decision: dict | None = None, + fallback_domain: str | None = None, + ) -> str | None: + normalized = self.service.normalizer.normalize_text(message).strip() + domain = self._decision_domain(turn_decision) or str(fallback_domain or "").strip().lower() + intent = self._decision_intent(turn_decision) + tool_name = str((turn_decision or {}).get("tool_name") or "").strip().lower() + + if domain == "sales": + if intent == "order_list" or tool_name == "listar_pedidos" or "quais pedidos" in normalized: + return "sales:list" + if intent == "order_cancel" or tool_name == "cancelar_pedido" or ("cancel" in normalized and "pedido" in normalized): + return "sales:cancel" + if tool_name == "avaliar_veiculo_troca" or ("avali" in normalized and "troca" in normalized): + return "sales:trade_in" + if ( + intent in {"order_create", "inventory_search"} + or tool_name in {"consultar_estoque", "realizar_pedido"} + or self.contains_any_term(normalized, {"compra", "comprar", "venda", "carro", "veiculo"}) + ): + return "sales:create" + + if domain == "review": + if intent == "review_list" or tool_name == "listar_agendamentos_revisao" or "agendamentos" in normalized: + return "review:list" + if intent == "review_cancel" or ("cancel" in normalized and "revis" in normalized): + return "review:cancel" + if intent == "review_reschedule" or "remarc" in normalized: + return "review:reschedule" + if ( + intent == "review_schedule" + or tool_name == "agendar_revisao" + or self.contains_any_term(normalized, {"revisao", "agendar", "agendamento"}) + ): + return "review:schedule" + + if domain == "rental": + if intent == "rental_list" or tool_name == "consultar_frota_aluguel" or "frota" in normalized: + return "rental:list" + if tool_name == "registrar_devolucao_aluguel" or "devol" in normalized: + return "rental:return" + if tool_name == "registrar_pagamento_aluguel" or "comprovante" in normalized or "pagamento" in normalized: + return "rental:payment" + if tool_name == "registrar_multa_aluguel" or "multa" in normalized: + return "rental:fine" + if ( + intent == "rental_create" + or self.contains_any_term(normalized, {"aluguel", "alugar", "locacao", "locar"}) + ): + return "rental:create" + + return None + + def derive_pending_order_task_key(self, order: dict) -> str | None: + return self.derive_operational_task_key( + message=str(order.get("seed_message") or order.get("message") or ""), + fallback_domain=str(order.get("domain") or "general"), + ) + + def queue_pending_orders_for_later( + self, + *, + user_id: int | None, + orders: list[dict], + skip_task_key: str | None = None, + ) -> int: + queued_count = 0 + skipped_matching_task = False + for order in orders: + if skip_task_key and not skipped_matching_task and self.derive_pending_order_task_key(order) == skip_task_key: + skipped_matching_task = True + continue + queued_count += int( + self.queue_order_with_memory_seed( + user_id=user_id, + domain=order["domain"], + order_message=order["message"], + memory_seed=order.get("memory_seed"), + ) + ) + return queued_count + # Distingue um comando global explicito de cancelamento do fluxo atual de um texto livre # que deve ser consumido como dado do rascunho aberto. @@ -518,26 +717,36 @@ class ConversationPolicy: } if normalized in indifferent_tokens: return 0, True + numeric_match = re.fullmatch(r"(?:opcao|acao|pedido)?\s*(\d+)", normalized) + if numeric_match: + candidate = int(numeric_match.group(1)) - 1 + if 0 <= candidate < len(orders): + return candidate, False if normalized in {"1", "primeiro", "primeira", "opcao 1", "acao 1", "pedido 1"}: return 0, False if normalized in {"2", "segundo", "segunda", "opcao 2", "acao 2", "pedido 2"}: return 1, False - + if normalized in {"3", "terceiro", "terceira", "opcao 3", "acao 3", "pedido 3"}: + return (2, False) if len(orders) >= 3 else (None, False) decision_domain = self._decision_domain(turn_decision) - if len(orders) >= 2 and decision_domain in {"review", "sales"}: + if len(orders) >= 2 and decision_domain in ACTIONABLE_ORDER_DOMAINS: matches = [index for index, order in enumerate(orders) if order.get("domain") == decision_domain] if len(matches) == 1: return matches[0], False review_matches = [index for index, order in enumerate(orders) if order.get("domain") == "review"] sales_matches = [index for index, order in enumerate(orders) if order.get("domain") == "sales"] + rental_matches = [index for index, order in enumerate(orders) if order.get("domain") == "rental"] has_review_signal = self.contains_any_term(normalized, {"revisao", "agendamento", "agendar", "remarcar", "pos venda"}) has_sales_signal = self.contains_any_term(normalized, {"venda", "compra", "comprar", "pedido", "cancelamento", "cancelar", "carro", "veiculo"}) + has_rental_signal = self.contains_any_term(normalized, {"aluguel", "locacao", "alugar", "locar", "devolucao", "frota"}) if len(review_matches) == 1 and has_review_signal and not has_sales_signal: return review_matches[0], False if len(sales_matches) == 1 and has_sales_signal and not has_review_signal: return sales_matches[0], False + if len(rental_matches) == 1 and has_rental_signal and not has_review_signal and not has_sales_signal: + return rental_matches[0], False return None, False @@ -572,26 +781,59 @@ class ConversationPolicy: return "Tudo bem. Limpei o contexto atual. Pode me dizer o que voce quer fazer agora?" return await self.service.handle_message(cleaned_message, user_id=user_id) + if ( + self.looks_like_fresh_operational_request(message, turn_decision=turn_decision) + and not self.is_explicit_pending_order_selection_message(message, turn_decision=turn_decision) + ): + current_task_key = self.derive_operational_task_key( + message=message, + turn_decision=turn_decision, + ) + matching_indexes = [ + index + for index, order in enumerate(orders) + if current_task_key and self.derive_pending_order_task_key(order) == current_task_key + ] + if len(matching_indexes) == 1: + selected_index = matching_indexes[0] + selected_order = orders[selected_index] + context["pending_order_selection"] = None + self.queue_pending_orders_for_later( + user_id=user_id, + orders=[order for index, order in enumerate(orders) if index != selected_index], + ) + + intro = f"Perfeito. Vou comecar por: {self.describe_order_selection_option(selected_order)}" + selected_memory = dict(selected_order.get("memory_seed") or {}) + context["active_domain"] = selected_order.get("domain") or context.get("active_domain", "general") + if selected_memory: + context["generic_memory"] = selected_memory + self._save_context(user_id=user_id, context=context) + next_response = await self.service.handle_message(message, user_id=user_id) + return f"{intro}\n{next_response}" + + context["pending_order_selection"] = None + self._save_context(user_id=user_id, context=context) + self.queue_pending_orders_for_later( + user_id=user_id, + orders=orders, + skip_task_key=current_task_key, + ) + return None + selected_index, auto_selected = self.detect_selected_order_index( message=message, orders=orders, turn_decision=turn_decision, ) if selected_index is None: - if self.looks_like_fresh_operational_request(message, turn_decision=turn_decision): - context["pending_order_selection"] = None - self._save_context(user_id=user_id, context=context) - return None return self.render_order_selection_prompt(orders) selected_order = orders[selected_index] - remaining_order = orders[1 - selected_index] context["pending_order_selection"] = None - self.queue_order_with_memory_seed( + self.queue_pending_orders_for_later( user_id=user_id, - domain=remaining_order["domain"], - order_message=remaining_order["message"], - memory_seed=remaining_order.get("memory_seed"), + orders=[order for index, order in enumerate(orders) if index != selected_index], ) intro = ( @@ -600,10 +842,12 @@ class ConversationPolicy: else f"Perfeito. Vou comecar por: {self.describe_order_selection_option(selected_order)}" ) selected_memory = dict(selected_order.get("memory_seed") or {}) + context["active_domain"] = selected_order.get("domain") or context.get("active_domain", "general") if selected_memory: context["generic_memory"] = selected_memory self._save_context(user_id=user_id, context=context) - next_response = await self.service.handle_message(str(selected_order.get("message") or ""), user_id=user_id) + selected_message = str(selected_order.get("seed_message") or selected_order.get("message") or "") + next_response = await self.service.handle_message(selected_message, user_id=user_id) return f"{intro}\n{next_response}" diff --git a/app/services/orchestration/conversation_state_store.py b/app/services/orchestration/conversation_state_store.py index 3d8413b..883df31 100644 --- a/app/services/orchestration/conversation_state_store.py +++ b/app/services/orchestration/conversation_state_store.py @@ -67,7 +67,14 @@ class ConversationStateStore(ConversationStateRepository): if user_id is None or not isinstance(context, dict): return with self._lock: - self.user_contexts[user_id] = context + stored_context = dict(context) + if "expires_at" not in stored_context: + existing = self.user_contexts.get(user_id) + if isinstance(existing, dict) and existing.get("expires_at") is not None: + stored_context["expires_at"] = existing["expires_at"] + else: + stored_context["expires_at"] = utc_now() + timedelta(minutes=60) + self.user_contexts[user_id] = stored_context def get_entry(self, bucket: str, user_id: int | None, *, expire: bool = False) -> dict | None: if user_id is None: diff --git a/app/services/orchestration/orquestrador_service.py b/app/services/orchestration/orquestrador_service.py index 883beac..b983f32 100644 --- a/app/services/orchestration/orquestrador_service.py +++ b/app/services/orchestration/orquestrador_service.py @@ -35,6 +35,7 @@ from app.services.orchestration.prompt_builders import ( from app.services.flows.review_flow import ReviewFlowMixin from app.services.orchestration.tool_executor import ToolExecutor from app.services.tools.tool_registry import ToolRegistry +from app.services.orchestration.response_formatter import format_currency_br, format_datetime_for_chat logger = logging.getLogger(__name__) @@ -86,12 +87,18 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin, RentalFlowMixin): turn_started_at = utc_now() turn_started_perf = perf_counter() turn_history_persisted = False - self._turn_trace = { + turn_trace = { "request_id": str(uuid4()), "conversation_id": f"user:{user_id}" if user_id is not None else "anonymous", "user_id": user_id, "started_at": turn_started_at, } + turn_trace_stack = getattr(self, "_turn_trace_stack", None) + if not isinstance(turn_trace_stack, list): + turn_trace_stack = [] + self._turn_trace_stack = turn_trace_stack + turn_trace_stack.append(turn_trace) + self._turn_trace = turn_trace self._log_turn_event("turn_received", message=message) async def finish(response: str, queue_notice: str | None = None) -> str: @@ -105,7 +112,7 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin, RentalFlowMixin): base_response=composed, user_id=user_id, ) - self._turn_trace["elapsed_ms"] = round((perf_counter() - turn_started_perf) * 1000, 2) + turn_trace["elapsed_ms"] = round((perf_counter() - turn_started_perf) * 1000, 2) self._log_turn_event("turn_completed", response=final_response) if not turn_history_persisted: self._finalize_turn_history( @@ -171,6 +178,13 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin, RentalFlowMixin): ) if active_review_follow_up: return active_review_follow_up + current_rental_info = await self._try_handle_current_rental_info_request( + message=message, + user_id=user_id, + finish=finish, + ) + if current_rental_info: + return current_rental_info # Faz uma leitura inicial do turno para ajudar a policy # com fila, troca de contexto e comandos globais. early_turn_decision = await self._extract_turn_decision_with_llm( @@ -537,7 +551,7 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin, RentalFlowMixin): return await finish(text, queue_notice=queue_notice) except Exception as exc: if not turn_history_persisted: - self._turn_trace["elapsed_ms"] = round((perf_counter() - turn_started_perf) * 1000, 2) + turn_trace["elapsed_ms"] = round((perf_counter() - turn_started_perf) * 1000, 2) self._finalize_turn_history( user_message=message, assistant_response=None, @@ -546,6 +560,17 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin, RentalFlowMixin): ) turn_history_persisted = True raise + finally: + current_stack = getattr(self, "_turn_trace_stack", None) + if isinstance(current_stack, list): + if current_stack and current_stack[-1] is turn_trace: + current_stack.pop() + else: + try: + current_stack.remove(turn_trace) + except ValueError: + pass + self._turn_trace = current_stack[-1] if current_stack else None async def _try_execute_orchestration_control_tool( self, @@ -885,6 +910,213 @@ class OrquestradorService(ReviewFlowMixin, OrderFlowMixin, RentalFlowMixin): arguments["placa"] = str(last_contract["placa"]) return arguments + def _has_current_rental_info_request(self, message: str, user_id: int | None = None) -> bool: + if user_id is None: + return False + last_contract = self._get_last_rental_contract(user_id) + if not isinstance(last_contract, dict): + return False + has_policy = hasattr(self, "policy") and getattr(self, "policy") is not None + if has_policy and (self._has_open_flow(user_id, "sales") or self._has_open_flow(user_id, "review")): + return False + + normalized_message = self._normalize_text(message).strip() + if not normalized_message: + return False + if ( + self._has_rental_return_management_request(message, user_id=user_id) + or self._has_rental_payment_request(message, user_id=user_id) + or self._has_rental_fine_request(message, user_id=user_id) + ): + return False + if ( + self._has_explicit_order_request(message) + or self._has_stock_listing_request(message) + or self._has_order_listing_request(message) + or self._has_trade_in_evaluation_request(message) + or self._has_rental_listing_request(message) + or self._has_explicit_rental_request(message) + ): + return False + + question_terms = ( + "qual", + "quais", + "quando", + "quanto", + "status", + "dados", + "informacoes", + "me diga", + "me informa", + "me lembra", + ) + has_question_shape = normalized_message.endswith("?") or any( + term in normalized_message for term in question_terms + ) + if not has_question_shape: + return False + + rental_anchor_terms = ( + "aluguel", + "locacao", + "contrato", + "devolucao", + "diaria", + "pagamento", + "comprovante", + ) + if any(term in normalized_message for term in rental_anchor_terms): + return True + + active_domain = str(((self._get_user_context(user_id) or {}).get("active_domain") or "")).strip().lower() + if active_domain != "rental": + return False + + contextual_detail_terms = ( + "placa", + "veiculo", + "carro", + "modelo", + "inicio", + "valor", + ) + return any(term in normalized_message for term in contextual_detail_terms) + + def _build_current_rental_info_response(self, message: str, user_id: int | None) -> str | None: + snapshot = self._get_last_rental_contract(user_id) + if not isinstance(snapshot, dict): + return None + + normalized_message = self._normalize_text(message).strip() + contract_number = str(snapshot.get("contrato_numero") or "").strip() + plate = str(snapshot.get("placa") or "").strip() + vehicle_model = str(snapshot.get("modelo_veiculo") or "").strip() + start_at = format_datetime_for_chat(snapshot.get("data_inicio")) if snapshot.get("data_inicio") else "" + due_at = format_datetime_for_chat(snapshot.get("data_fim_prevista")) if snapshot.get("data_fim_prevista") else "" + returned_at = format_datetime_for_chat(snapshot.get("data_devolucao")) if snapshot.get("data_devolucao") else "" + daily_rate = format_currency_br(snapshot.get("valor_diaria")) if snapshot.get("valor_diaria") is not None else "" + expected_total = format_currency_br(snapshot.get("valor_previsto")) if snapshot.get("valor_previsto") is not None else "" + final_total = format_currency_br(snapshot.get("valor_final")) if snapshot.get("valor_final") is not None else "" + payment_status = str(snapshot.get("status_pagamento") or "").strip() + payment_at = format_datetime_for_chat(snapshot.get("data_pagamento")) if snapshot.get("data_pagamento") else "" + payment_amount = format_currency_br(snapshot.get("valor_pagamento")) if snapshot.get("valor_pagamento") is not None else "" + + if ( + "devolucao" in normalized_message + or "devolver" in normalized_message + or "data de devolucao" in normalized_message + or "data da devolucao" in normalized_message + ): + if returned_at: + lines = [f"A devolucao do seu aluguel foi registrada em {returned_at}."] + elif due_at: + lines = [f"A devolucao prevista do seu aluguel e {due_at}."] + else: + lines = ["Nao encontrei a data de devolucao prevista do seu aluguel atual."] + if contract_number: + lines.append(f"Contrato: {contract_number}") + if vehicle_model: + lines.append(f"Veiculo: {vehicle_model}") + if plate: + lines.append(f"Placa: {plate}") + return "\n".join(lines) + + if "pagamento" in normalized_message or "comprovante" in normalized_message or "paguei" in normalized_message or "quitado" in normalized_message or "pago" in normalized_message: + if payment_status == "registrado" and payment_at and payment_amount: + lines = [f"O pagamento mais recente do seu aluguel ja foi registrado em {payment_at}, no valor de {payment_amount}."] + elif payment_status == "registrado" and payment_at: + lines = [f"O pagamento mais recente do seu aluguel ja foi registrado em {payment_at}."] + elif payment_status: + lines = [f"O status do pagamento do seu aluguel atual e: {payment_status}."] + else: + lines = ["O pagamento do seu aluguel atual esta em aberto."] + if contract_number: + lines.append(f"Contrato: {contract_number}") + if plate: + lines.append(f"Placa: {plate}") + return "\n".join(lines) + + if "diaria" in normalized_message: + if not daily_rate: + return "Nao encontrei a diaria do seu aluguel atual no contexto recente." + lines = [f"A diaria atual do seu aluguel e {daily_rate}."] + if contract_number: + lines.append(f"Contrato: {contract_number}") + if vehicle_model: + lines.append(f"Veiculo: {vehicle_model}") + return "\n".join(lines) + + if "valor" in normalized_message or "quanto" in normalized_message: + if final_total: + lines = [f"O valor final do seu aluguel esta em {final_total}."] + elif expected_total: + lines = [f"O valor previsto do seu aluguel esta em {expected_total}."] + else: + lines = ["Nao encontrei o valor do seu aluguel atual no contexto recente."] + if contract_number: + lines.append(f"Contrato: {contract_number}") + return "\n".join(lines) + + if "placa" in normalized_message: + if not plate: + return "Nao encontrei a placa do seu aluguel atual no contexto recente." + return f"A placa do seu aluguel atual e {plate}." + + if "contrato" in normalized_message: + if not contract_number: + return "Nao encontrei o numero do contrato do seu aluguel atual no contexto recente." + return f"O numero do contrato do seu aluguel atual e {contract_number}." + + if "inicio" in normalized_message or "retirada" in normalized_message: + if not start_at: + return "Nao encontrei a data de inicio do seu aluguel atual no contexto recente." + lines = [f"O inicio do seu aluguel foi em {start_at}."] + if contract_number: + lines.append(f"Contrato: {contract_number}") + return "\n".join(lines) + + lines = ["Resumo do seu aluguel atual:"] + if contract_number: + lines.append(f"Contrato: {contract_number}") + if vehicle_model: + lines.append(f"Veiculo: {vehicle_model}") + if plate: + lines.append(f"Placa: {plate}") + if start_at: + lines.append(f"Inicio: {start_at}") + if returned_at: + lines.append(f"Devolucao registrada: {returned_at}") + elif due_at: + lines.append(f"Devolucao prevista: {due_at}") + if daily_rate: + lines.append(f"Diaria: {daily_rate}") + if final_total: + lines.append(f"Valor final: {final_total}") + elif expected_total: + lines.append(f"Valor previsto: {expected_total}") + if payment_status == "registrado" and payment_at: + payment_line = f"Pagamento: registrado em {payment_at}" + if payment_amount: + payment_line += f" ({payment_amount})" + lines.append(payment_line) + elif payment_status: + lines.append(f"Pagamento: {payment_status}") + return "\n".join(lines) + + async def _try_handle_current_rental_info_request( + self, + message: str, + user_id: int | None, + finish, + ) -> str | None: + if user_id is None or not self._has_current_rental_info_request(message, user_id=user_id): + return None + response = self._build_current_rental_info_response(message=message, user_id=user_id) + if not response: + return None + return await finish(response) + # Evita tratar perguntas sobre devolucao como se fossem um encerramento real. def _looks_like_rental_return_question(self, message: str) -> bool: normalized_message = self._normalize_text(message).strip() diff --git a/scripts/stress_smoke.py b/scripts/stress_smoke.py index 78d70b7..80d526b 100644 --- a/scripts/stress_smoke.py +++ b/scripts/stress_smoke.py @@ -162,5 +162,5 @@ if __name__ == "__main__": parser.add_argument("--order-cycles", type=int, default=10) parser.add_argument("--race-attempts", type=int, default=5) parser.add_argument("--user-base", type=int, default=990000) - parser.add_argument("--cpf", default="10000000001") + parser.add_argument("--cpf", default="11144477735") asyncio.run(main(parser.parse_args())) diff --git a/tests/test_conversation_state_store.py b/tests/test_conversation_state_store.py new file mode 100644 index 0000000..39cf44d --- /dev/null +++ b/tests/test_conversation_state_store.py @@ -0,0 +1,39 @@ +import unittest + +from app.services.orchestration.conversation_state_store import ConversationStateStore + + +class ConversationStateStoreTests(unittest.TestCase): + def test_save_user_context_preserves_existing_expiration_when_missing(self): + store = ConversationStateStore() + store.upsert_user_context(1, ttl_minutes=30) + + original_expires_at = store.get_user_context(1)["expires_at"] + store.save_user_context( + 1, + { + "active_domain": "sales", + "active_task": "order_create", + "generic_memory": {}, + "shared_memory": {}, + "collected_slots": {}, + "flow_snapshots": {}, + "last_tool_result": None, + "order_queue": [], + "pending_order_selection": None, + "pending_switch": None, + "last_stock_results": [], + "selected_vehicle": None, + "last_rental_results": [], + "selected_rental_vehicle": None, + }, + ) + + stored_context = store.get_user_context(1) + self.assertEqual(stored_context["active_domain"], "sales") + self.assertEqual(stored_context["active_task"], "order_create") + self.assertEqual(stored_context["expires_at"], original_expires_at) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_telegram_multimodal.py b/tests/test_telegram_multimodal.py index f5820d5..07df1e7 100644 --- a/tests/test_telegram_multimodal.py +++ b/tests/test_telegram_multimodal.py @@ -3,6 +3,7 @@ import asyncio from types import SimpleNamespace from unittest.mock import AsyncMock, patch +import aiohttp from fastapi import HTTPException from app.integrations.telegram_satellite_service import ( @@ -42,6 +43,36 @@ class _FakeTelegramSession: return _FakeTelegramResponse(self.payload) +class _FlakyTelegramResponse: + def __init__(self, outcome): + self.outcome = outcome + + async def __aenter__(self): + if isinstance(self.outcome, BaseException): + raise self.outcome + return self + + async def __aexit__(self, exc_type, exc, tb): + return None + + async def json(self): + return self.outcome + + +class _FlakyTelegramSession: + def __init__(self, outcomes): + self.outcomes = list(outcomes) + self.calls = [] + + def post(self, url, json): + self.calls.append((url, json)) + if self.outcomes: + outcome = self.outcomes.pop(0) + else: + outcome = {"ok": True} + return _FlakyTelegramResponse(outcome) + + class TelegramMultimodalTests(unittest.IsolatedAsyncioTestCase): def _build_service(self) -> TelegramSatelliteService: service = TelegramSatelliteService( @@ -320,6 +351,47 @@ class TelegramMultimodalTests(unittest.IsolatedAsyncioTestCase): entry = service.state.get_entry(TELEGRAM_RUNTIME_BUCKET, TELEGRAM_RUNTIME_OWNER_ID) self.assertEqual(entry["last_update_id"], 14) + async def test_send_message_retries_transient_transport_failure(self): + service = self._build_service() + session = _FlakyTelegramSession( + [ + asyncio.TimeoutError(), + aiohttp.ClientConnectionError("falha temporaria"), + {"ok": True}, + ] + ) + + with patch("app.integrations.telegram_satellite_service.asyncio.sleep", AsyncMock()) as sleep_mock: + await service._send_message(session=session, chat_id=99, text="resposta teste") + + self.assertEqual(len(session.calls), 3) + self.assertEqual(sleep_mock.await_count, 2) + + async def test_handle_update_swallows_unexpected_delivery_failure(self): + service = self._build_service() + update = { + "update_id": 15, + "message": { + "message_id": 89, + "chat": {"id": 99}, + "from": {"id": 99}, + "text": "status do pedido", + }, + } + + with patch.object(service, "_extract_image_attachments", AsyncMock(return_value=[])), patch.object( + service, + "_process_message", + AsyncMock(return_value="Pedido encontrado."), + ), patch.object( + service, + "_send_message", + AsyncMock(side_effect=RuntimeError("falha inesperada de entrega")), + ), patch("app.integrations.telegram_satellite_service.logger.exception") as logger_exception: + await service._handle_update(session=SimpleNamespace(), update=update) + + self.assertTrue(logger_exception.called) + async def test_persist_last_processed_update_id_keeps_highest_seen_value(self): service = self._build_service() diff --git a/tests/test_turn_decision_contract.py b/tests/test_turn_decision_contract.py index f49b478..29c2b52 100644 --- a/tests/test_turn_decision_contract.py +++ b/tests/test_turn_decision_contract.py @@ -3,10 +3,16 @@ import unittest from types import SimpleNamespace from unittest.mock import patch +from sqlalchemy import create_engine +from sqlalchemy.orm import sessionmaker +from sqlalchemy.pool import StaticPool + os.environ.setdefault("DEBUG", "false") from datetime import datetime, timedelta from app.core.time_utils import utc_now +from app.db.mock_database import MockBase +from app.db.mock_models import RentalContract, RentalPayment, RentalVehicle from app.services.orchestration.conversation_policy import ConversationPolicy from app.services.orchestration.entity_normalizer import EntityNormalizer @@ -1306,6 +1312,109 @@ class TurnDecisionContractTests(unittest.IsolatedAsyncioTestCase): self.assertEqual(history_calls[0]["turn_status"], "completed") self.assertEqual(history_calls[0]["intent"], "general") + async def test_handle_message_restores_outer_turn_trace_after_nested_call(self): + state = FakeState( + contexts={ + 1: { + "active_domain": "general", + "generic_memory": {}, + "shared_memory": {}, + "order_queue": [], + "pending_order_selection": None, + "pending_switch": None, + "last_stock_results": [], + "selected_vehicle": None, + } + } + ) + history_calls = [] + service = OrquestradorService.__new__(OrquestradorService) + service.state = state + service.normalizer = EntityNormalizer() + service.policy = ConversationPolicy(service=service) + service.history_service = SimpleNamespace(record_turn=lambda **kwargs: history_calls.append(kwargs)) + service._empty_extraction_payload = service.normalizer.empty_extraction_payload + service._log_turn_event = lambda *args, **kwargs: None + service._compose_order_aware_response = lambda response, user_id, queue_notice=None: response + service._upsert_user_context = lambda user_id: None + service._get_user_context = lambda user_id: state.get_user_context(user_id) + service._save_user_context = lambda user_id, context: state.save_user_context(user_id, context) + + async def fake_maybe_auto_advance_next_order(base_response: str, user_id: int | None): + if base_response == "resposta externa": + nested_response = await service.handle_message("mensagem interna", user_id=user_id) + return f"{base_response}\n{nested_response}" + return base_response + + async def fake_extract_turn_decision(message: str, user_id: int | None): + return { + "intent": "general", + "domain": "general", + "action": "answer_user", + "entities": service.normalizer.empty_extraction_payload(), + "missing_fields": [], + "selection_index": None, + "tool_name": None, + "tool_arguments": {}, + "response_to_user": "resposta interna" if message == "mensagem interna" else "resposta externa", + } + + async def fake_extract_message_plan(message: str, user_id: int | None): + return {"orders": [{"domain": "general", "message": message}]} + + service._maybe_auto_advance_next_order = fake_maybe_auto_advance_next_order + service._extract_turn_decision_with_llm = fake_extract_turn_decision + service._extract_message_plan_with_llm = fake_extract_message_plan + service._prepare_message_for_single_order = lambda message, user_id, routing_plan=None: (message, None, None) + service._resolve_entities_for_message_plan = lambda message_plan, routed_message: service.normalizer.empty_extraction_payload() + + async def fake_try_handle_immediate_context_reset(**kwargs): + return None + + async def fake_try_resolve_pending_order_selection(**kwargs): + return None + + async def fake_try_continue_queued_order(**kwargs): + return None + + async def fake_try_execute_orchestration_control_tool(**kwargs): + return None + + async def fake_try_execute_business_tool_from_turn_decision(**kwargs): + return None + + service._try_handle_immediate_context_reset = fake_try_handle_immediate_context_reset + service._try_resolve_pending_order_selection = fake_try_resolve_pending_order_selection + service._try_continue_queued_order = fake_try_continue_queued_order + service._try_execute_orchestration_control_tool = fake_try_execute_orchestration_control_tool + service._try_execute_business_tool_from_turn_decision = fake_try_execute_business_tool_from_turn_decision + service._handle_context_switch = lambda **kwargs: None + service._update_active_domain = lambda **kwargs: None + + async def fake_extract_entities_with_llm(message: str, user_id: int | None): + return service.normalizer.empty_extraction_payload() + + async def fake_extract_missing_sales_search_context_with_llm(**kwargs): + return {} + + service._extract_entities_with_llm = fake_extract_entities_with_llm + service._extract_missing_sales_search_context_with_llm = fake_extract_missing_sales_search_context_with_llm + service._domain_from_intents = lambda intents: "general" + + response = await service.handle_message("mensagem externa", user_id=1) + + self.assertEqual(response, "resposta externa\nresposta interna") + self.assertEqual(len(history_calls), 2) + self.assertEqual( + {call["user_message"] for call in history_calls}, + {"mensagem externa", "mensagem interna"}, + ) + self.assertEqual( + {call["assistant_response"] for call in history_calls}, + {"resposta externa\nresposta interna", "resposta interna"}, + ) + self.assertEqual(len({call["request_id"] for call in history_calls}), 2) + async def test_handle_message_persists_failed_turn_history(self): state = FakeState( contexts={ @@ -2721,6 +2830,275 @@ class TurnDecisionContractTests(unittest.IsolatedAsyncioTestCase): ) + async def test_handle_message_short_circuits_for_current_rental_info_question(self): + state = FakeState( + contexts={ + 1: { + "active_domain": "general", + "generic_memory": {}, + "shared_memory": {}, + "order_queue": [], + "pending_order_selection": None, + "pending_switch": None, + "last_stock_results": [], + "selected_vehicle": None, + "last_rental_results": [], + "selected_rental_vehicle": None, + "last_rental_contract": { + "contrato_numero": "LOC-20260323-CAEECA1C", + "placa": "RAA1A02", + "modelo_veiculo": "Fiat Pulse", + "data_inicio": "2026-03-19T10:00:00", + "data_fim_prevista": "2026-03-21T10:00:00", + "valor_diaria": 189.9, + "valor_previsto": 379.8, + "status": "ativa", + "status_pagamento": "registrado", + "data_pagamento": "2026-03-23T15:47:00", + "valor_pagamento": 379.8, + }, + } + } + ) + service = OrquestradorService.__new__(OrquestradorService) + service.state = state + service.normalizer = EntityNormalizer() + service.policy = ConversationPolicy(service=service) + service._empty_extraction_payload = service.normalizer.empty_extraction_payload + service._log_turn_event = lambda *args, **kwargs: None + service._compose_order_aware_response = lambda response, user_id, queue_notice=None: response + service._get_user_context = lambda user_id: state.get_user_context(user_id) + service._save_user_context = lambda user_id, context: state.save_user_context(user_id, context) + + async def fake_maybe_auto_advance_next_order(base_response: str, user_id: int | None): + return base_response + + service._maybe_auto_advance_next_order = fake_maybe_auto_advance_next_order + service._upsert_user_context = lambda user_id: None + + async def fake_try_handle_pending_stock_selection_follow_up(**kwargs): + return None + + async def fake_try_handle_active_sales_follow_up(**kwargs): + return None + + async def fake_try_handle_pending_rental_selection_follow_up(**kwargs): + return None + + async def fake_try_handle_active_rental_follow_up(**kwargs): + return None + + async def fake_try_handle_active_review_follow_up(**kwargs): + return None + + service._try_handle_pending_stock_selection_follow_up = fake_try_handle_pending_stock_selection_follow_up + service._try_handle_active_sales_follow_up = fake_try_handle_active_sales_follow_up + service._try_handle_pending_rental_selection_follow_up = fake_try_handle_pending_rental_selection_follow_up + service._try_handle_active_rental_follow_up = fake_try_handle_active_rental_follow_up + service._try_handle_active_review_follow_up = fake_try_handle_active_review_follow_up + + async def fake_extract_turn_decision(message: str, user_id: int | None): + raise AssertionError("nao deveria consultar o LLM para consulta informativa do aluguel atual") + + service._extract_turn_decision_with_llm = fake_extract_turn_decision + + response = await service.handle_message( + "qual a data de devolucao do meu aluguel?", + user_id=1, + ) + + self.assertIn("A devolucao prevista do seu aluguel e 21/03/2026 10:00.", response) + self.assertIn("Contrato: LOC-20260323-CAEECA1C", response) + self.assertIn("Veiculo: Fiat Pulse", response) + + async def test_handle_message_rehydrates_current_rental_info_from_db_after_restart(self): + engine = create_engine( + "sqlite://", + connect_args={"check_same_thread": False}, + poolclass=StaticPool, + ) + SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) + MockBase.metadata.create_all(bind=engine) + self.addCleanup(engine.dispose) + + db = SessionLocal() + try: + vehicle = RentalVehicle( + placa="RAA1A02", + modelo="Fiat Pulse", + categoria="suv", + ano=2024, + valor_diaria=189.9, + status="disponivel", + ) + db.add(vehicle) + db.commit() + db.refresh(vehicle) + + contract = RentalContract( + contrato_numero="LOC-20260323-CAEECA1C", + user_id=1, + rental_vehicle_id=vehicle.id, + placa=vehicle.placa, + modelo_veiculo=vehicle.modelo, + categoria=vehicle.categoria, + data_inicio=datetime(2026, 3, 19, 10, 0), + data_fim_prevista=datetime(2026, 3, 21, 10, 0), + data_devolucao=None, + valor_diaria=189.9, + valor_previsto=379.8, + valor_final=None, + status="ativa", + ) + db.add(contract) + db.commit() + db.refresh(contract) + + payment = RentalPayment( + protocolo="ALP-20260323-0B41DD0D", + user_id=1, + rental_contract_id=contract.id, + contrato_numero=contract.contrato_numero, + placa=contract.placa, + valor=379.8, + data_pagamento=datetime(2026, 3, 23, 15, 47), + favorecido="Locadora XPTO", + identificador_comprovante="NSU123456", + observacoes="pagamento da locacao", + ) + db.add(payment) + db.commit() + finally: + db.close() + + state = FakeState( + contexts={ + 1: { + "active_domain": "general", + "generic_memory": {}, + "shared_memory": {}, + "order_queue": [], + "pending_order_selection": None, + "pending_switch": None, + "last_stock_results": [], + "selected_vehicle": None, + "last_rental_results": [], + "selected_rental_vehicle": None, + } + } + ) + service = OrquestradorService.__new__(OrquestradorService) + service.state = state + service.normalizer = EntityNormalizer() + service.policy = ConversationPolicy(service=service) + service._empty_extraction_payload = service.normalizer.empty_extraction_payload + service._log_turn_event = lambda *args, **kwargs: None + service._compose_order_aware_response = lambda response, user_id, queue_notice=None: response + service._get_user_context = lambda user_id: state.get_user_context(user_id) + service._save_user_context = lambda user_id, context: state.save_user_context(user_id, context) + + async def fake_maybe_auto_advance_next_order(base_response: str, user_id: int | None): + return base_response + + service._maybe_auto_advance_next_order = fake_maybe_auto_advance_next_order + service._upsert_user_context = lambda user_id: None + + async def fake_try_handle_pending_stock_selection_follow_up(**kwargs): + return None + + async def fake_try_handle_active_sales_follow_up(**kwargs): + return None + + async def fake_try_handle_pending_rental_selection_follow_up(**kwargs): + return None + + async def fake_try_handle_active_rental_follow_up(**kwargs): + return None + + async def fake_try_handle_active_review_follow_up(**kwargs): + return None + + service._try_handle_pending_stock_selection_follow_up = fake_try_handle_pending_stock_selection_follow_up + service._try_handle_active_sales_follow_up = fake_try_handle_active_sales_follow_up + service._try_handle_pending_rental_selection_follow_up = fake_try_handle_pending_rental_selection_follow_up + service._try_handle_active_rental_follow_up = fake_try_handle_active_rental_follow_up + service._try_handle_active_review_follow_up = fake_try_handle_active_review_follow_up + + async def fake_extract_turn_decision(message: str, user_id: int | None): + raise AssertionError("nao deveria consultar o LLM para consulta informativa do aluguel apos restart") + + service._extract_turn_decision_with_llm = fake_extract_turn_decision + + with patch("app.services.flows.rental_flow_support.SessionMockLocal", SessionLocal): + response = await service.handle_message( + "qual a data de devolucao do meu aluguel?", + user_id=1, + ) + + self.assertIn("A devolucao prevista do seu aluguel e 21/03/2026 10:00.", response) + self.assertIn("Contrato: LOC-20260323-CAEECA1C", response) + self.assertIn("Veiculo: Fiat Pulse", response) + snapshot = state.get_user_context(1)["last_rental_contract"] + self.assertEqual(snapshot["contrato_numero"], "LOC-20260323-CAEECA1C") + self.assertEqual(snapshot["status_pagamento"], "registrado") + self.assertEqual(snapshot["data_fim_prevista"], "2026-03-21T10:00:00") + + def test_store_last_rental_contract_preserves_contract_snapshot_after_payment_update(self): + state = FakeState( + contexts={ + 1: { + "active_domain": "general", + "generic_memory": {}, + "shared_memory": {}, + "order_queue": [], + "pending_order_selection": None, + "pending_switch": None, + "last_stock_results": [], + "selected_vehicle": None, + "last_rental_results": [], + "selected_rental_vehicle": None, + } + } + ) + service = OrquestradorService.__new__(OrquestradorService) + service.state = state + service.normalizer = EntityNormalizer() + service._get_user_context = lambda user_id: state.get_user_context(user_id) + service._save_user_context = lambda user_id, context: state.save_user_context(user_id, context) + + service._store_last_rental_contract( + user_id=1, + payload={ + "contrato_numero": "LOC-20260323-CAEECA1C", + "placa": "RAA1A02", + "modelo_veiculo": "Fiat Pulse", + "data_inicio": "2026-03-19T10:00:00", + "data_fim_prevista": "2026-03-21T10:00:00", + "valor_diaria": 189.9, + "valor_previsto": 379.8, + "status": "ativa", + }, + ) + service._store_last_rental_contract( + user_id=1, + payload={ + "contrato_numero": "LOC-20260323-CAEECA1C", + "placa": "RAA1A02", + "valor": 379.8, + "data_pagamento": "2026-03-23T15:47:00", + "favorecido": "Locadora XPTO", + "status": "registrado", + }, + ) + + snapshot = state.get_user_context(1)["last_rental_contract"] + self.assertEqual(snapshot["modelo_veiculo"], "Fiat Pulse") + self.assertEqual(snapshot["data_fim_prevista"], "2026-03-21T10:00:00") + self.assertEqual(snapshot["status"], "ativa") + self.assertEqual(snapshot["status_pagamento"], "registrado") + self.assertEqual(snapshot["data_pagamento"], "2026-03-23T15:47:00") + self.assertEqual(snapshot["valor_pagamento"], 379.8) + def test_has_rental_return_management_request_ignores_return_question_even_with_last_contract(self): state = FakeState( contexts={ @@ -3715,6 +4093,73 @@ class TurnDecisionContractTests(unittest.IsolatedAsyncioTestCase): self.assertIn("Vou comecar por: Venda: fazer pedido", response) + async def test_pending_order_selection_promotes_new_operational_request_before_previous_options(self): + state = FakeState( + contexts={ + 9: { + "pending_order_selection": { + "orders": [ + {"domain": "sales", "message": "compra", "seed_message": "quero comprar um veiculo", "memory_seed": {}}, + {"domain": "review", "message": "revisao", "seed_message": "quero agendar revisao", "memory_seed": {}}, + {"domain": "rental", "message": "aluguel", "seed_message": "quero alugar um carro", "memory_seed": {}}, + ], + "expires_at": utc_now() + timedelta(minutes=15), + }, + "order_queue": [], + "active_domain": "general", + "generic_memory": {}, + } + } + ) + service = FakePolicyService(state) + policy = ConversationPolicy(service=service) + + response = await policy.try_resolve_pending_order_selection( + message="quais pedidos eu tenho?", + user_id=9, + turn_decision={"domain": "sales", "intent": "order_list", "action": "call_tool", "tool_name": "listar_pedidos"}, + ) + + self.assertIsNone(response) + context = state.get_user_context(9) + self.assertIsNone(context["pending_order_selection"]) + self.assertEqual([item["domain"] for item in context["order_queue"]], ["sales", "review", "rental"]) + self.assertEqual(context["order_queue"][0]["message"], "quero comprar um veiculo") + + async def test_pending_order_selection_skips_duplicate_base_task_when_new_request_is_more_specific(self): + state = FakeState( + contexts={ + 9: { + "pending_order_selection": { + "orders": [ + {"domain": "sales", "message": "compra", "seed_message": "quero comprar um veiculo", "memory_seed": {}}, + {"domain": "review", "message": "revisao", "seed_message": "quero agendar revisao", "memory_seed": {}}, + {"domain": "rental", "message": "aluguel", "seed_message": "quero alugar um carro", "memory_seed": {}}, + ], + "expires_at": utc_now() + timedelta(minutes=15), + }, + "order_queue": [], + "active_domain": "general", + "generic_memory": {}, + } + } + ) + service = FakePolicyService(state) + policy = ConversationPolicy(service=service) + + response = await policy.try_resolve_pending_order_selection( + message="quero comprar um suv ate 95 mil", + user_id=9, + turn_decision={"domain": "sales", "intent": "order_create", "action": "collect_order_create"}, + ) + + self.assertIn("Perfeito. Vou comecar por: Venda: compra", response) + self.assertIn("handled:quero comprar um suv ate 95 mil", response) + context = state.get_user_context(9) + self.assertIsNone(context["pending_order_selection"]) + self.assertEqual(context["active_domain"], "sales") + self.assertEqual([item["domain"] for item in context["order_queue"]], ["review", "rental"]) + async def test_try_continue_queue_prefers_turn_decision_action(self): state = FakeState( contexts={ @@ -3812,6 +4257,111 @@ class TurnDecisionContractTests(unittest.IsolatedAsyncioTestCase): self.assertIsNone(early_response) self.assertEqual(service._get_user_context(9).get("order_queue"), []) + def test_prepare_message_for_single_order_requests_clarification_for_three_actionable_domains(self): + state = FakeState( + contexts={ + 9: { + "active_domain": "general", + "generic_memory": {}, + "order_queue": [], + "pending_order_selection": None, + "pending_switch": None, + } + }, + ) + service = FakePolicyService(state) + policy = ConversationPolicy(service=service) + + routed_message, queue_notice, early_response = policy.prepare_message_for_single_order( + message="oi, pode me ajudar com compra, revisao e aluguel?", + user_id=9, + routing_plan={ + "orders": [ + {"domain": "sales", "message": "compra"}, + {"domain": "review", "message": "revisao"}, + ] + }, + ) + + self.assertEqual(routed_message, "oi, pode me ajudar com compra, revisao e aluguel?") + self.assertIsNone(queue_notice) + self.assertIn("Identifiquei 3 acoes", early_response) + self.assertIn("3. Locacao: aluguel", early_response) + pending = state.get_user_context(9)["pending_order_selection"] + self.assertEqual(len(pending["orders"]), 3) + + def test_prepare_message_for_single_order_counts_only_orders_effectively_queued(self): + state = FakeState( + entries={ + "pending_review_drafts": { + 9: { + "payload": {"placa": "ABC1234"}, + "expires_at": utc_now() + timedelta(minutes=15), + } + } + }, + contexts={ + 9: { + "active_domain": "review", + "generic_memory": {}, + "order_queue": [], + "pending_order_selection": None, + "pending_switch": None, + } + }, + ) + service = FakePolicyService(state) + policy = ConversationPolicy(service=service) + + routed_message, queue_notice, early_response = policy.prepare_message_for_single_order( + message="quero continuar a revisao e tambem ver aluguel", + user_id=9, + routing_plan={ + "orders": [ + {"domain": "review", "message": "quero continuar a revisao"}, + {"domain": "general", "message": "oi"}, + {"domain": "rental", "message": "quero ver aluguel"}, + ] + }, + ) + + self.assertEqual(routed_message, "quero continuar a revisao e tambem ver aluguel") + self.assertIn("Anotei mais 1 pedido", early_response) + self.assertEqual(len(state.get_user_context(9)["order_queue"]), 1) + self.assertEqual(state.get_user_context(9)["order_queue"][0]["domain"], "rental") + self.assertEqual(state.get_user_context(9)["order_queue"][0]["message"], "quero ver aluguel") + + async def test_pending_order_selection_uses_canonical_seed_message_for_selected_domain(self): + state = FakeState( + contexts={ + 9: { + "active_domain": "general", + "generic_memory": {}, + "order_queue": [], + "pending_order_selection": None, + "pending_switch": None, + } + } + ) + service = FakePolicyService(state) + policy = ConversationPolicy(service=service) + policy.store_pending_order_selection( + user_id=9, + orders=[ + {"domain": "sales", "message": "compra", "entities": service.normalizer.empty_extraction_payload()}, + {"domain": "review", "message": "revisao", "entities": service.normalizer.empty_extraction_payload()}, + {"domain": "rental", "message": "aluguel", "entities": service.normalizer.empty_extraction_payload()}, + ], + ) + + response = await policy.try_resolve_pending_order_selection(message="1", user_id=9) + + self.assertIn("Perfeito. Vou comecar por: Venda: compra", response) + self.assertIn("handled:quero comprar um veiculo", response) + context = state.get_user_context(9) + self.assertEqual(context["active_domain"], "sales") + self.assertEqual([item["domain"] for item in context["order_queue"]], ["review", "rental"]) + async def test_tool_continuar_proximo_pedido_reports_empty_queue(self): state = FakeState(