From 71592c544e47f1e39a24e59a56b6b1069bef5009 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vitor=20Hugo=20Belorio=20Sim=C3=A3o?= Date: Mon, 16 Mar 2026 16:35:10 -0300 Subject: [PATCH] =?UTF-8?q?=F0=9F=9B=A1=EF=B8=8F=20fix(order):=20blindar?= =?UTF-8?q?=20reserva=20concorrente=20de=20veiculos=20e=20adicionar=20stre?= =?UTF-8?q?ss=20smoke?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - protege a criacao de pedidos com lock nomeado no MySQL e tentativa de row lock no veiculo antes da persistencia - reaproveita a checagem final de reserva apos o lock para evitar dupla reserva do mesmo veiculo em atendimentos concorrentes - adiciona regressao dedicada para garantir o uso de with_for_update e o conflito vehicle_already_reserved - inclui um stress smoke para validar persistencia de estado, ciclos completos de pedido e corrida de reserva com Redis --- app/services/domain/order_service.py | 114 ++++++++++++++---- scripts/stress_smoke.py | 166 +++++++++++++++++++++++++++ tests/test_order_service.py | 163 ++++++++++++++++++++++++++ 3 files changed, 419 insertions(+), 24 deletions(-) create mode 100644 scripts/stress_smoke.py create mode 100644 tests/test_order_service.py diff --git a/app/services/domain/order_service.py b/app/services/domain/order_service.py index b36c434..2b8bc55 100644 --- a/app/services/domain/order_service.py +++ b/app/services/domain/order_service.py @@ -1,4 +1,3 @@ -from datetime import datetime from app.core.time_utils import utc_now from typing import Any from uuid import uuid4 @@ -17,6 +16,57 @@ from app.services.user.mock_customer_service import hydrate_mock_customer_from_c # Responsabilidade: regra de pedido. +def _get_vehicle_for_update(db, vehicle_id: int) -> Vehicle | None: + return ( + db.query(Vehicle) + .filter(Vehicle.id == vehicle_id) + .with_for_update() + .first() + ) + + +def _get_active_order_for_vehicle(db, vehicle_id: int) -> Order | None: + return ( + db.query(Order) + .filter(Order.vehicle_id == vehicle_id) + .filter(Order.status != "Cancelado") + .first() + ) + + +def _acquire_vehicle_reservation_lock(db, vehicle_id: int, timeout_seconds: int = 5) -> str | None: + lock_name = f"orquestrador:vehicle_reservation:{vehicle_id}" + try: + acquired = db.execute( + text("SELECT GET_LOCK(:lock_name, :timeout_seconds)"), + {"lock_name": lock_name, "timeout_seconds": timeout_seconds}, + ).scalar() + except (OperationalError, SQLAlchemyError): + return None + + if int(acquired or 0) != 1: + raise_tool_http_error( + status_code=409, + code="vehicle_reservation_busy", + message="Outro atendimento esta finalizando a reserva deste veiculo. Tente novamente.", + retryable=True, + field="vehicle_id", + ) + return lock_name + + +def _release_vehicle_reservation_lock(db, lock_name: str | None) -> None: + if not lock_name: + return + try: + db.execute( + text("SELECT RELEASE_LOCK(:lock_name)"), + {"lock_name": lock_name}, + ) + except (OperationalError, SQLAlchemyError): + pass + + async def listar_pedidos( user_id: int | None = None, cpf: str | None = None, @@ -161,6 +211,7 @@ async def realizar_pedido( ) -> dict[str, Any]: cpf_norm = normalize_cpf(cpf) db = SessionMockLocal() + reservation_lock_name: str | None = None try: vehicle = db.query(Vehicle).filter(Vehicle.id == vehicle_id).first() if not vehicle: @@ -172,27 +223,6 @@ async def realizar_pedido( field="vehicle_id", ) - existing_order = None - try: - existing_order = ( - db.query(Order) - .filter(Order.vehicle_id == vehicle_id) - .filter(Order.status != "Cancelado") - .first() - ) - except (OperationalError, SQLAlchemyError) as exc: - if not is_legacy_schema_issue(exc): - raise - db.rollback() - if existing_order: - raise_tool_http_error( - status_code=409, - code="vehicle_already_reserved", - message="Este veiculo ja esta reservado e nao aparece mais no estoque disponivel.", - retryable=True, - field="vehicle_id", - ) - valor_veiculo = float(vehicle.preco) modelo_veiculo = str(vehicle.modelo) @@ -210,6 +240,41 @@ async def realizar_pedido( field="cpf", ) + reservation_lock_name = _acquire_vehicle_reservation_lock(db, vehicle_id) + + try: + locked_vehicle = _get_vehicle_for_update(db, vehicle_id) + except (OperationalError, SQLAlchemyError) as exc: + db.rollback() + if not is_legacy_schema_issue(exc): + raise + locked_vehicle = db.query(Vehicle).filter(Vehicle.id == vehicle_id).first() + + if not locked_vehicle: + raise_tool_http_error( + status_code=404, + code="vehicle_not_found", + message="Veiculo nao encontrado no estoque.", + retryable=True, + field="vehicle_id", + ) + + existing_order = None + try: + existing_order = _get_active_order_for_vehicle(db, vehicle_id) + except (OperationalError, SQLAlchemyError) as exc: + if not is_legacy_schema_issue(exc): + raise + db.rollback() + if existing_order: + raise_tool_http_error( + status_code=409, + code="vehicle_already_reserved", + message="Este veiculo ja esta reservado e nao aparece mais no estoque disponivel.", + retryable=True, + field="vehicle_id", + ) + numero_pedido = f"PED-{utc_now().strftime('%Y%m%d%H%M%S')}-{uuid4().hex[:6].upper()}" if user_id is not None: user = db.query(User).filter(User.id == user_id).first() @@ -220,7 +285,7 @@ async def realizar_pedido( numero_pedido=numero_pedido, user_id=user_id, cpf=cpf_norm, - vehicle_id=vehicle.id, + vehicle_id=locked_vehicle.id, modelo_veiculo=modelo_veiculo, valor_veiculo=valor_veiculo, status="Ativo", @@ -251,7 +316,7 @@ async def realizar_pedido( "numero_pedido": numero_pedido, "user_id": user_id, "cpf": cpf_norm, - "vehicle_id": vehicle.id, + "vehicle_id": locked_vehicle.id, "modelo_veiculo": modelo_veiculo, "status": "Ativo", "status_veiculo": "Reservado", @@ -271,4 +336,5 @@ async def realizar_pedido( "aprovado_credito": True, } finally: + _release_vehicle_reservation_lock(db, reservation_lock_name) db.close() diff --git a/scripts/stress_smoke.py b/scripts/stress_smoke.py new file mode 100644 index 0000000..78d70b7 --- /dev/null +++ b/scripts/stress_smoke.py @@ -0,0 +1,166 @@ +import argparse +import asyncio +import os +import sys +from pathlib import Path + +from dotenv import load_dotenv +from fastapi import HTTPException + +PROJECT_ROOT = Path(__file__).resolve().parents[1] +if str(PROJECT_ROOT) not in sys.path: + sys.path.insert(0, str(PROJECT_ROOT)) + + +def configure_environment(dotenv_path: str, backend: str, redis_url: str) -> None: + env_path = Path(dotenv_path) + if env_path.exists(): + load_dotenv(env_path) + os.environ["CONVERSATION_STATE_BACKEND"] = backend + os.environ["REDIS_URL"] = redis_url + os.environ.setdefault("DEBUG", "false") + + +async def run_state_stress(repo, iterations: int, user_base: int) -> None: + print(f"[state] iniciando {iterations} iteracao(oes) no backend {type(repo).__name__}") + touched_user_ids: list[int] = [] + for offset in range(iterations): + user_id = user_base + offset + touched_user_ids.append(user_id) + repo.upsert_user_context(user_id, ttl_minutes=60) + repo.save_user_context( + user_id, + { + "active_domain": "sales", + "active_task": "order_create", + "generic_memory": {"cpf": "12345678909", "orcamento_max": 80000 + offset}, + "shared_memory": {}, + "collected_slots": {}, + "flow_snapshots": {}, + "last_tool_result": None, + "order_queue": [], + "pending_order_selection": None, + "pending_switch": None, + "last_stock_results": [], + "selected_vehicle": None, + }, + ) + repo.set_entry( + "pending_order_drafts", + user_id, + { + "payload": {"cpf": "12345678909", "vehicle_id": 8}, + }, + ) + context = repo.get_user_context(user_id) + draft = repo.get_entry("pending_order_drafts", user_id, expire=True) + if not context or context.get("active_task") != "order_create": + raise RuntimeError(f"contexto nao persistido corretamente para user_id={user_id}") + if not draft or draft.get("payload", {}).get("vehicle_id") != 8: + raise RuntimeError(f"draft nao persistido corretamente para user_id={user_id}") + repo.pop_entry("pending_order_drafts", user_id) + + if hasattr(repo, "redis"): + keys_to_delete = [] + for user_id in touched_user_ids: + keys_to_delete.append(f"{repo.key_prefix}:user_contexts:{user_id}") + keys_to_delete.append(f"{repo.key_prefix}:pending_order_drafts:{user_id}") + if keys_to_delete: + repo.redis.delete(*keys_to_delete) + print("[state] ok") + + +async def run_order_cycles(order_cycles: int, cpf: str) -> None: + from app.services.domain.inventory_service import consultar_estoque + from app.services.domain.order_service import cancelar_pedido, listar_pedidos, realizar_pedido + + print(f"[orders] iniciando {order_cycles} ciclo(s) completos") + for cycle in range(order_cycles): + estoque = await consultar_estoque(preco_max=80000, limite=1, ordenar_preco="asc") + if not estoque: + raise RuntimeError("nenhum veiculo encontrado para o ciclo de pedido") + vehicle_id = int(estoque[0]["id"]) + pedido = await realizar_pedido(cpf=cpf, vehicle_id=vehicle_id, user_id=None) + pedidos = await listar_pedidos(cpf=cpf, limite=10) + if not any(item.get("numero_pedido") == pedido.get("numero_pedido") for item in pedidos): + raise RuntimeError("pedido criado nao apareceu na listagem") + cancelado = await cancelar_pedido( + numero_pedido=pedido["numero_pedido"], + motivo=f"stress-cycle-{cycle}", + user_id=None, + ) + if cancelado.get("status") != "Cancelado": + raise RuntimeError("pedido nao foi cancelado corretamente no ciclo de estresse") + print("[orders] ok") + + +def _sync_create_order(cpf: str, vehicle_id: int): + from app.services.domain.order_service import realizar_pedido + + return asyncio.run(realizar_pedido(cpf=cpf, vehicle_id=vehicle_id, user_id=None)) + + +async def run_reservation_race(attempts: int, cpf: str) -> None: + from app.services.domain.inventory_service import consultar_estoque + from app.services.domain.order_service import cancelar_pedido + + print(f"[race] iniciando corrida com {attempts} tentativa(s) para o mesmo veiculo") + estoque = await consultar_estoque(preco_max=80000, limite=1, ordenar_preco="asc") + if not estoque: + raise RuntimeError("nenhum veiculo encontrado para a corrida de reserva") + vehicle_id = int(estoque[0]["id"]) + + tasks = [asyncio.to_thread(_sync_create_order, cpf, vehicle_id) for _ in range(attempts)] + results = await asyncio.gather(*tasks, return_exceptions=True) + + successes = [result for result in results if isinstance(result, dict)] + conflict_codes = {"vehicle_already_reserved", "vehicle_reservation_busy"} + conflicts = [ + result for result in results + if isinstance(result, HTTPException) and isinstance(result.detail, dict) and result.detail.get("code") in conflict_codes + ] + unexpected = [result for result in results if result not in successes and result not in conflicts] + + if len(successes) != 1: + raise RuntimeError(f"corrida de reserva retornou {len(successes)} sucesso(s); esperado exatamente 1") + if len(conflicts) != attempts - 1: + raise RuntimeError(f"corrida de reserva retornou {len(conflicts)} conflito(s); esperado {attempts - 1}") + if unexpected: + raise RuntimeError(f"corrida de reserva retornou erro(s) inesperado(s): {unexpected!r}") + + await cancelar_pedido( + numero_pedido=successes[0]["numero_pedido"], + motivo="stress-race-cleanup", + user_id=None, + ) + print("[race] ok") + + +async def main(args) -> None: + configure_environment(args.dotenv, backend=args.backend, redis_url=args.redis_url) + + import app.services.orchestration.state_repository_factory as factory + from app.db.init_db import init_db + from app.services.orchestration.state_repository_factory import get_conversation_state_repository + + factory._state_repository = None + init_db() + repo = get_conversation_state_repository() + + await run_state_stress(repo=repo, iterations=args.state_iterations, user_base=args.user_base) + await run_order_cycles(order_cycles=args.order_cycles, cpf=args.cpf) + await run_reservation_race(attempts=args.race_attempts, cpf=args.cpf) + print("[done] stress smoke concluido com sucesso") + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Stress smoke do orquestrador com Redis e fluxo de pedidos") + parser.add_argument("--dotenv", default=".env.local") + parser.add_argument("--backend", default="redis") + parser.add_argument("--redis-url", default="redis://127.0.0.1:6379/0") + parser.add_argument("--state-iterations", type=int, default=50) + parser.add_argument("--order-cycles", type=int, default=10) + parser.add_argument("--race-attempts", type=int, default=5) + parser.add_argument("--user-base", type=int, default=990000) + parser.add_argument("--cpf", default="10000000001") + asyncio.run(main(parser.parse_args())) diff --git a/tests/test_order_service.py b/tests/test_order_service.py new file mode 100644 index 0000000..18e79a3 --- /dev/null +++ b/tests/test_order_service.py @@ -0,0 +1,163 @@ +import os +import unittest +from datetime import datetime +from types import SimpleNamespace +from unittest.mock import patch + +os.environ.setdefault("DEBUG", "false") + +from fastapi import HTTPException + +from app.db.mock_models import Vehicle +from app.services.domain import order_service + + +class LockingQuery: + def __init__(self, result): + self.result = result + self.with_for_update_called = False + + def filter(self, *args, **kwargs): + return self + + def with_for_update(self): + self.with_for_update_called = True + return self + + def first(self): + return self.result + + +class LockingSession: + def __init__(self, result): + self.query_instance = LockingQuery(result) + + def query(self, model): + return self.query_instance + + +class FakeSession: + def __init__(self, vehicle=None, user=None): + self.vehicle = vehicle + self.user = user + self.added = [] + self.committed = False + self.closed = False + self.rolled_back = False + self.refreshed = [] + + def query(self, model): + if model is order_service.Vehicle: + return LockingQuery(self.vehicle) + if model is order_service.User: + return LockingQuery(self.user) + raise AssertionError(f"unexpected model query: {model}") + + def execute(self, statement, params=None): + sql_text = str(statement) + if "GET_LOCK" in sql_text: + return SimpleNamespace(scalar=lambda: 1) + if "RELEASE_LOCK" in sql_text: + return SimpleNamespace(scalar=lambda: 1) + raise AssertionError(f"unexpected execute call: {sql_text}") + + def add(self, item): + self.added.append(item) + + def commit(self): + self.committed = True + + def rollback(self): + self.rolled_back = True + + def refresh(self, item): + self.refreshed.append(item) + + def close(self): + self.closed = True + + +class OrderServiceReservationTests(unittest.IsolatedAsyncioTestCase): + def test_get_vehicle_for_update_uses_row_lock(self): + vehicle = Vehicle(id=8, modelo="Toyota Corolla 2024", categoria="suv", preco=76087.0) + session = LockingSession(vehicle) + + locked_vehicle = order_service._get_vehicle_for_update(session, 8) + + self.assertIs(locked_vehicle, vehicle) + self.assertTrue(session.query_instance.with_for_update_called) + + async def test_realizar_pedido_raises_conflict_when_locked_vehicle_is_already_reserved(self): + vehicle = Vehicle(id=8, modelo="Toyota Corolla 2024", categoria="suv", preco=76087.0) + session = FakeSession(vehicle=vehicle) + existing_order = SimpleNamespace(numero_pedido="PED-RESERVA-001", status="Ativo") + + async def fake_hydrate_mock_customer_from_cpf(cpf: str, user_id: int | None = None): + return {"cpf": cpf, "user_id": user_id} + + async def fake_validar_cliente_venda(cpf: str, valor_veiculo: float): + return {"aprovado": True} + + with patch.object(order_service, "SessionMockLocal", return_value=session), patch.object( + order_service, + "hydrate_mock_customer_from_cpf", + new=fake_hydrate_mock_customer_from_cpf, + ), patch.object( + order_service, + "validar_cliente_venda", + new=fake_validar_cliente_venda, + ), patch.object(order_service, "_get_vehicle_for_update", return_value=vehicle) as locked_vehicle, patch.object( + order_service, + "_get_active_order_for_vehicle", + return_value=existing_order, + ): + with self.assertRaises(HTTPException) as ctx: + await order_service.realizar_pedido(cpf="123.456.789-09", vehicle_id=8) + + self.assertEqual(ctx.exception.status_code, 409) + self.assertEqual(ctx.exception.detail["code"], "vehicle_already_reserved") + locked_vehicle.assert_called_once_with(session, 8) + self.assertEqual(session.added, []) + self.assertTrue(session.closed) + + async def test_realizar_pedido_uses_locked_vehicle_before_persisting_order(self): + vehicle = Vehicle(id=8, modelo="Toyota Corolla 2024", categoria="suv", preco=76087.0) + session = FakeSession(vehicle=vehicle) + fake_uuid = SimpleNamespace(hex="abc123def456") + fixed_now = datetime(2026, 3, 16, 17, 30, 0) + + async def fake_hydrate_mock_customer_from_cpf(cpf: str, user_id: int | None = None): + return {"cpf": cpf, "user_id": user_id} + + async def fake_validar_cliente_venda(cpf: str, valor_veiculo: float): + return {"aprovado": True} + + with patch.object(order_service, "SessionMockLocal", return_value=session), patch.object( + order_service, + "hydrate_mock_customer_from_cpf", + new=fake_hydrate_mock_customer_from_cpf, + ), patch.object( + order_service, + "validar_cliente_venda", + new=fake_validar_cliente_venda, + ), patch.object(order_service, "_get_vehicle_for_update", return_value=vehicle) as locked_vehicle, patch.object( + order_service, + "_get_active_order_for_vehicle", + return_value=None, + ), patch.object(order_service, "uuid4", return_value=fake_uuid), patch.object(order_service, "utc_now", return_value=fixed_now): + result = await order_service.realizar_pedido(cpf="123.456.789-09", vehicle_id=8) + + locked_vehicle.assert_called_once_with(session, 8) + self.assertTrue(session.committed) + self.assertEqual(len(session.added), 1) + created_order = session.added[0] + self.assertEqual(created_order.vehicle_id, 8) + self.assertEqual(created_order.cpf, "12345678909") + self.assertEqual(result["numero_pedido"], "PED-20260316173000-ABC123") + self.assertEqual(result["vehicle_id"], 8) + self.assertEqual(result["status_veiculo"], "Reservado") + self.assertTrue(session.closed) + + +if __name__ == "__main__": + unittest.main()