import argparse import asyncio import os import sys from pathlib import Path from dotenv import load_dotenv from fastapi import HTTPException PROJECT_ROOT = Path(__file__).resolve().parents[1] if str(PROJECT_ROOT) not in sys.path: sys.path.insert(0, str(PROJECT_ROOT)) def configure_environment(dotenv_path: str, backend: str, redis_url: str) -> None: env_path = Path(dotenv_path) if env_path.exists(): load_dotenv(env_path) os.environ["CONVERSATION_STATE_BACKEND"] = backend os.environ["REDIS_URL"] = redis_url os.environ.setdefault("DEBUG", "false") async def run_state_stress(repo, iterations: int, user_base: int) -> None: print(f"[state] iniciando {iterations} iteracao(oes) no backend {type(repo).__name__}") touched_user_ids: list[int] = [] for offset in range(iterations): user_id = user_base + offset touched_user_ids.append(user_id) repo.upsert_user_context(user_id, ttl_minutes=60) repo.save_user_context( user_id, { "active_domain": "sales", "active_task": "order_create", "generic_memory": {"cpf": "12345678909", "orcamento_max": 80000 + offset}, "shared_memory": {}, "collected_slots": {}, "flow_snapshots": {}, "last_tool_result": None, "order_queue": [], "pending_order_selection": None, "pending_switch": None, "last_stock_results": [], "selected_vehicle": None, }, ) repo.set_entry( "pending_order_drafts", user_id, { "payload": {"cpf": "12345678909", "vehicle_id": 8}, }, ) context = repo.get_user_context(user_id) draft = repo.get_entry("pending_order_drafts", user_id, expire=True) if not context or context.get("active_task") != "order_create": raise RuntimeError(f"contexto nao persistido corretamente para user_id={user_id}") if not draft or draft.get("payload", {}).get("vehicle_id") != 8: raise RuntimeError(f"draft nao persistido corretamente para user_id={user_id}") repo.pop_entry("pending_order_drafts", user_id) if hasattr(repo, "redis"): keys_to_delete = [] for user_id in touched_user_ids: keys_to_delete.append(f"{repo.key_prefix}:user_contexts:{user_id}") keys_to_delete.append(f"{repo.key_prefix}:pending_order_drafts:{user_id}") if keys_to_delete: repo.redis.delete(*keys_to_delete) print("[state] ok") async def run_order_cycles(order_cycles: int, cpf: str) -> None: from app.services.domain.inventory_service import consultar_estoque from app.services.domain.order_service import cancelar_pedido, listar_pedidos, realizar_pedido print(f"[orders] iniciando {order_cycles} ciclo(s) completos") for cycle in range(order_cycles): estoque = await consultar_estoque(preco_max=80000, limite=1, ordenar_preco="asc") if not estoque: raise RuntimeError("nenhum veiculo encontrado para o ciclo de pedido") vehicle_id = int(estoque[0]["id"]) pedido = await realizar_pedido(cpf=cpf, vehicle_id=vehicle_id, user_id=None) pedidos = await listar_pedidos(cpf=cpf, limite=10) if not any(item.get("numero_pedido") == pedido.get("numero_pedido") for item in pedidos): raise RuntimeError("pedido criado nao apareceu na listagem") cancelado = await cancelar_pedido( numero_pedido=pedido["numero_pedido"], motivo=f"stress-cycle-{cycle}", user_id=None, ) if cancelado.get("status") != "Cancelado": raise RuntimeError("pedido nao foi cancelado corretamente no ciclo de estresse") print("[orders] ok") def _sync_create_order(cpf: str, vehicle_id: int): from app.services.domain.order_service import realizar_pedido return asyncio.run(realizar_pedido(cpf=cpf, vehicle_id=vehicle_id, user_id=None)) async def run_reservation_race(attempts: int, cpf: str) -> None: from app.services.domain.inventory_service import consultar_estoque from app.services.domain.order_service import cancelar_pedido print(f"[race] iniciando corrida com {attempts} tentativa(s) para o mesmo veiculo") estoque = await consultar_estoque(preco_max=80000, limite=1, ordenar_preco="asc") if not estoque: raise RuntimeError("nenhum veiculo encontrado para a corrida de reserva") vehicle_id = int(estoque[0]["id"]) tasks = [asyncio.to_thread(_sync_create_order, cpf, vehicle_id) for _ in range(attempts)] results = await asyncio.gather(*tasks, return_exceptions=True) successes = [result for result in results if isinstance(result, dict)] conflict_codes = {"vehicle_already_reserved", "vehicle_reservation_busy"} conflicts = [ result for result in results if isinstance(result, HTTPException) and isinstance(result.detail, dict) and result.detail.get("code") in conflict_codes ] unexpected = [result for result in results if result not in successes and result not in conflicts] if len(successes) != 1: raise RuntimeError(f"corrida de reserva retornou {len(successes)} sucesso(s); esperado exatamente 1") if len(conflicts) != attempts - 1: raise RuntimeError(f"corrida de reserva retornou {len(conflicts)} conflito(s); esperado {attempts - 1}") if unexpected: raise RuntimeError(f"corrida de reserva retornou erro(s) inesperado(s): {unexpected!r}") await cancelar_pedido( numero_pedido=successes[0]["numero_pedido"], motivo="stress-race-cleanup", user_id=None, ) print("[race] ok") async def main(args) -> None: configure_environment(args.dotenv, backend=args.backend, redis_url=args.redis_url) import app.services.orchestration.state_repository_factory as factory from app.db.init_db import init_db from app.services.orchestration.state_repository_factory import get_conversation_state_repository factory._state_repository = None init_db() repo = get_conversation_state_repository() await run_state_stress(repo=repo, iterations=args.state_iterations, user_base=args.user_base) await run_order_cycles(order_cycles=args.order_cycles, cpf=args.cpf) await run_reservation_race(attempts=args.race_attempts, cpf=args.cpf) print("[done] stress smoke concluido com sucesso") if __name__ == "__main__": parser = argparse.ArgumentParser(description="Stress smoke do orquestrador com Redis e fluxo de pedidos") parser.add_argument("--dotenv", default=".env.local") parser.add_argument("--backend", default="redis") parser.add_argument("--redis-url", default="redis://127.0.0.1:6379/0") parser.add_argument("--state-iterations", type=int, default=50) parser.add_argument("--order-cycles", type=int, default=10) parser.add_argument("--race-attempts", type=int, default=5) parser.add_argument("--user-base", type=int, default=990000) parser.add_argument("--cpf", default="11144477735") asyncio.run(main(parser.parse_args()))