🛡️ fix(order): blindar reserva concorrente de veiculos e adicionar stress smoke

- protege a criacao de pedidos com lock nomeado no MySQL e tentativa de row lock no veiculo antes da persistencia

- reaproveita a checagem final de reserva apos o lock para evitar dupla reserva do mesmo veiculo em atendimentos concorrentes

- adiciona regressao dedicada para garantir o uso de with_for_update e o conflito vehicle_already_reserved

- inclui um stress smoke para validar persistencia de estado, ciclos completos de pedido e corrida de reserva com Redis
main
parent 31cd7cdb69
commit 71592c544e

@ -1,4 +1,3 @@
from datetime import datetime
from app.core.time_utils import utc_now
from typing import Any
from uuid import uuid4
@ -17,6 +16,57 @@ from app.services.user.mock_customer_service import hydrate_mock_customer_from_c
# Responsabilidade: regra de pedido.
def _get_vehicle_for_update(db, vehicle_id: int) -> Vehicle | None:
return (
db.query(Vehicle)
.filter(Vehicle.id == vehicle_id)
.with_for_update()
.first()
)
def _get_active_order_for_vehicle(db, vehicle_id: int) -> Order | None:
return (
db.query(Order)
.filter(Order.vehicle_id == vehicle_id)
.filter(Order.status != "Cancelado")
.first()
)
def _acquire_vehicle_reservation_lock(db, vehicle_id: int, timeout_seconds: int = 5) -> str | None:
lock_name = f"orquestrador:vehicle_reservation:{vehicle_id}"
try:
acquired = db.execute(
text("SELECT GET_LOCK(:lock_name, :timeout_seconds)"),
{"lock_name": lock_name, "timeout_seconds": timeout_seconds},
).scalar()
except (OperationalError, SQLAlchemyError):
return None
if int(acquired or 0) != 1:
raise_tool_http_error(
status_code=409,
code="vehicle_reservation_busy",
message="Outro atendimento esta finalizando a reserva deste veiculo. Tente novamente.",
retryable=True,
field="vehicle_id",
)
return lock_name
def _release_vehicle_reservation_lock(db, lock_name: str | None) -> None:
if not lock_name:
return
try:
db.execute(
text("SELECT RELEASE_LOCK(:lock_name)"),
{"lock_name": lock_name},
)
except (OperationalError, SQLAlchemyError):
pass
async def listar_pedidos(
user_id: int | None = None,
cpf: str | None = None,
@ -161,6 +211,7 @@ async def realizar_pedido(
) -> dict[str, Any]:
cpf_norm = normalize_cpf(cpf)
db = SessionMockLocal()
reservation_lock_name: str | None = None
try:
vehicle = db.query(Vehicle).filter(Vehicle.id == vehicle_id).first()
if not vehicle:
@ -172,27 +223,6 @@ async def realizar_pedido(
field="vehicle_id",
)
existing_order = None
try:
existing_order = (
db.query(Order)
.filter(Order.vehicle_id == vehicle_id)
.filter(Order.status != "Cancelado")
.first()
)
except (OperationalError, SQLAlchemyError) as exc:
if not is_legacy_schema_issue(exc):
raise
db.rollback()
if existing_order:
raise_tool_http_error(
status_code=409,
code="vehicle_already_reserved",
message="Este veiculo ja esta reservado e nao aparece mais no estoque disponivel.",
retryable=True,
field="vehicle_id",
)
valor_veiculo = float(vehicle.preco)
modelo_veiculo = str(vehicle.modelo)
@ -210,6 +240,41 @@ async def realizar_pedido(
field="cpf",
)
reservation_lock_name = _acquire_vehicle_reservation_lock(db, vehicle_id)
try:
locked_vehicle = _get_vehicle_for_update(db, vehicle_id)
except (OperationalError, SQLAlchemyError) as exc:
db.rollback()
if not is_legacy_schema_issue(exc):
raise
locked_vehicle = db.query(Vehicle).filter(Vehicle.id == vehicle_id).first()
if not locked_vehicle:
raise_tool_http_error(
status_code=404,
code="vehicle_not_found",
message="Veiculo nao encontrado no estoque.",
retryable=True,
field="vehicle_id",
)
existing_order = None
try:
existing_order = _get_active_order_for_vehicle(db, vehicle_id)
except (OperationalError, SQLAlchemyError) as exc:
if not is_legacy_schema_issue(exc):
raise
db.rollback()
if existing_order:
raise_tool_http_error(
status_code=409,
code="vehicle_already_reserved",
message="Este veiculo ja esta reservado e nao aparece mais no estoque disponivel.",
retryable=True,
field="vehicle_id",
)
numero_pedido = f"PED-{utc_now().strftime('%Y%m%d%H%M%S')}-{uuid4().hex[:6].upper()}"
if user_id is not None:
user = db.query(User).filter(User.id == user_id).first()
@ -220,7 +285,7 @@ async def realizar_pedido(
numero_pedido=numero_pedido,
user_id=user_id,
cpf=cpf_norm,
vehicle_id=vehicle.id,
vehicle_id=locked_vehicle.id,
modelo_veiculo=modelo_veiculo,
valor_veiculo=valor_veiculo,
status="Ativo",
@ -251,7 +316,7 @@ async def realizar_pedido(
"numero_pedido": numero_pedido,
"user_id": user_id,
"cpf": cpf_norm,
"vehicle_id": vehicle.id,
"vehicle_id": locked_vehicle.id,
"modelo_veiculo": modelo_veiculo,
"status": "Ativo",
"status_veiculo": "Reservado",
@ -271,4 +336,5 @@ async def realizar_pedido(
"aprovado_credito": True,
}
finally:
_release_vehicle_reservation_lock(db, reservation_lock_name)
db.close()

@ -0,0 +1,166 @@
import argparse
import asyncio
import os
import sys
from pathlib import Path
from dotenv import load_dotenv
from fastapi import HTTPException
PROJECT_ROOT = Path(__file__).resolve().parents[1]
if str(PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(PROJECT_ROOT))
def configure_environment(dotenv_path: str, backend: str, redis_url: str) -> None:
env_path = Path(dotenv_path)
if env_path.exists():
load_dotenv(env_path)
os.environ["CONVERSATION_STATE_BACKEND"] = backend
os.environ["REDIS_URL"] = redis_url
os.environ.setdefault("DEBUG", "false")
async def run_state_stress(repo, iterations: int, user_base: int) -> None:
print(f"[state] iniciando {iterations} iteracao(oes) no backend {type(repo).__name__}")
touched_user_ids: list[int] = []
for offset in range(iterations):
user_id = user_base + offset
touched_user_ids.append(user_id)
repo.upsert_user_context(user_id, ttl_minutes=60)
repo.save_user_context(
user_id,
{
"active_domain": "sales",
"active_task": "order_create",
"generic_memory": {"cpf": "12345678909", "orcamento_max": 80000 + offset},
"shared_memory": {},
"collected_slots": {},
"flow_snapshots": {},
"last_tool_result": None,
"order_queue": [],
"pending_order_selection": None,
"pending_switch": None,
"last_stock_results": [],
"selected_vehicle": None,
},
)
repo.set_entry(
"pending_order_drafts",
user_id,
{
"payload": {"cpf": "12345678909", "vehicle_id": 8},
},
)
context = repo.get_user_context(user_id)
draft = repo.get_entry("pending_order_drafts", user_id, expire=True)
if not context or context.get("active_task") != "order_create":
raise RuntimeError(f"contexto nao persistido corretamente para user_id={user_id}")
if not draft or draft.get("payload", {}).get("vehicle_id") != 8:
raise RuntimeError(f"draft nao persistido corretamente para user_id={user_id}")
repo.pop_entry("pending_order_drafts", user_id)
if hasattr(repo, "redis"):
keys_to_delete = []
for user_id in touched_user_ids:
keys_to_delete.append(f"{repo.key_prefix}:user_contexts:{user_id}")
keys_to_delete.append(f"{repo.key_prefix}:pending_order_drafts:{user_id}")
if keys_to_delete:
repo.redis.delete(*keys_to_delete)
print("[state] ok")
async def run_order_cycles(order_cycles: int, cpf: str) -> None:
from app.services.domain.inventory_service import consultar_estoque
from app.services.domain.order_service import cancelar_pedido, listar_pedidos, realizar_pedido
print(f"[orders] iniciando {order_cycles} ciclo(s) completos")
for cycle in range(order_cycles):
estoque = await consultar_estoque(preco_max=80000, limite=1, ordenar_preco="asc")
if not estoque:
raise RuntimeError("nenhum veiculo encontrado para o ciclo de pedido")
vehicle_id = int(estoque[0]["id"])
pedido = await realizar_pedido(cpf=cpf, vehicle_id=vehicle_id, user_id=None)
pedidos = await listar_pedidos(cpf=cpf, limite=10)
if not any(item.get("numero_pedido") == pedido.get("numero_pedido") for item in pedidos):
raise RuntimeError("pedido criado nao apareceu na listagem")
cancelado = await cancelar_pedido(
numero_pedido=pedido["numero_pedido"],
motivo=f"stress-cycle-{cycle}",
user_id=None,
)
if cancelado.get("status") != "Cancelado":
raise RuntimeError("pedido nao foi cancelado corretamente no ciclo de estresse")
print("[orders] ok")
def _sync_create_order(cpf: str, vehicle_id: int):
from app.services.domain.order_service import realizar_pedido
return asyncio.run(realizar_pedido(cpf=cpf, vehicle_id=vehicle_id, user_id=None))
async def run_reservation_race(attempts: int, cpf: str) -> None:
from app.services.domain.inventory_service import consultar_estoque
from app.services.domain.order_service import cancelar_pedido
print(f"[race] iniciando corrida com {attempts} tentativa(s) para o mesmo veiculo")
estoque = await consultar_estoque(preco_max=80000, limite=1, ordenar_preco="asc")
if not estoque:
raise RuntimeError("nenhum veiculo encontrado para a corrida de reserva")
vehicle_id = int(estoque[0]["id"])
tasks = [asyncio.to_thread(_sync_create_order, cpf, vehicle_id) for _ in range(attempts)]
results = await asyncio.gather(*tasks, return_exceptions=True)
successes = [result for result in results if isinstance(result, dict)]
conflict_codes = {"vehicle_already_reserved", "vehicle_reservation_busy"}
conflicts = [
result for result in results
if isinstance(result, HTTPException) and isinstance(result.detail, dict) and result.detail.get("code") in conflict_codes
]
unexpected = [result for result in results if result not in successes and result not in conflicts]
if len(successes) != 1:
raise RuntimeError(f"corrida de reserva retornou {len(successes)} sucesso(s); esperado exatamente 1")
if len(conflicts) != attempts - 1:
raise RuntimeError(f"corrida de reserva retornou {len(conflicts)} conflito(s); esperado {attempts - 1}")
if unexpected:
raise RuntimeError(f"corrida de reserva retornou erro(s) inesperado(s): {unexpected!r}")
await cancelar_pedido(
numero_pedido=successes[0]["numero_pedido"],
motivo="stress-race-cleanup",
user_id=None,
)
print("[race] ok")
async def main(args) -> None:
configure_environment(args.dotenv, backend=args.backend, redis_url=args.redis_url)
import app.services.orchestration.state_repository_factory as factory
from app.db.init_db import init_db
from app.services.orchestration.state_repository_factory import get_conversation_state_repository
factory._state_repository = None
init_db()
repo = get_conversation_state_repository()
await run_state_stress(repo=repo, iterations=args.state_iterations, user_base=args.user_base)
await run_order_cycles(order_cycles=args.order_cycles, cpf=args.cpf)
await run_reservation_race(attempts=args.race_attempts, cpf=args.cpf)
print("[done] stress smoke concluido com sucesso")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Stress smoke do orquestrador com Redis e fluxo de pedidos")
parser.add_argument("--dotenv", default=".env.local")
parser.add_argument("--backend", default="redis")
parser.add_argument("--redis-url", default="redis://127.0.0.1:6379/0")
parser.add_argument("--state-iterations", type=int, default=50)
parser.add_argument("--order-cycles", type=int, default=10)
parser.add_argument("--race-attempts", type=int, default=5)
parser.add_argument("--user-base", type=int, default=990000)
parser.add_argument("--cpf", default="10000000001")
asyncio.run(main(parser.parse_args()))

@ -0,0 +1,163 @@
import os
import unittest
from datetime import datetime
from types import SimpleNamespace
from unittest.mock import patch
os.environ.setdefault("DEBUG", "false")
from fastapi import HTTPException
from app.db.mock_models import Vehicle
from app.services.domain import order_service
class LockingQuery:
def __init__(self, result):
self.result = result
self.with_for_update_called = False
def filter(self, *args, **kwargs):
return self
def with_for_update(self):
self.with_for_update_called = True
return self
def first(self):
return self.result
class LockingSession:
def __init__(self, result):
self.query_instance = LockingQuery(result)
def query(self, model):
return self.query_instance
class FakeSession:
def __init__(self, vehicle=None, user=None):
self.vehicle = vehicle
self.user = user
self.added = []
self.committed = False
self.closed = False
self.rolled_back = False
self.refreshed = []
def query(self, model):
if model is order_service.Vehicle:
return LockingQuery(self.vehicle)
if model is order_service.User:
return LockingQuery(self.user)
raise AssertionError(f"unexpected model query: {model}")
def execute(self, statement, params=None):
sql_text = str(statement)
if "GET_LOCK" in sql_text:
return SimpleNamespace(scalar=lambda: 1)
if "RELEASE_LOCK" in sql_text:
return SimpleNamespace(scalar=lambda: 1)
raise AssertionError(f"unexpected execute call: {sql_text}")
def add(self, item):
self.added.append(item)
def commit(self):
self.committed = True
def rollback(self):
self.rolled_back = True
def refresh(self, item):
self.refreshed.append(item)
def close(self):
self.closed = True
class OrderServiceReservationTests(unittest.IsolatedAsyncioTestCase):
def test_get_vehicle_for_update_uses_row_lock(self):
vehicle = Vehicle(id=8, modelo="Toyota Corolla 2024", categoria="suv", preco=76087.0)
session = LockingSession(vehicle)
locked_vehicle = order_service._get_vehicle_for_update(session, 8)
self.assertIs(locked_vehicle, vehicle)
self.assertTrue(session.query_instance.with_for_update_called)
async def test_realizar_pedido_raises_conflict_when_locked_vehicle_is_already_reserved(self):
vehicle = Vehicle(id=8, modelo="Toyota Corolla 2024", categoria="suv", preco=76087.0)
session = FakeSession(vehicle=vehicle)
existing_order = SimpleNamespace(numero_pedido="PED-RESERVA-001", status="Ativo")
async def fake_hydrate_mock_customer_from_cpf(cpf: str, user_id: int | None = None):
return {"cpf": cpf, "user_id": user_id}
async def fake_validar_cliente_venda(cpf: str, valor_veiculo: float):
return {"aprovado": True}
with patch.object(order_service, "SessionMockLocal", return_value=session), patch.object(
order_service,
"hydrate_mock_customer_from_cpf",
new=fake_hydrate_mock_customer_from_cpf,
), patch.object(
order_service,
"validar_cliente_venda",
new=fake_validar_cliente_venda,
), patch.object(order_service, "_get_vehicle_for_update", return_value=vehicle) as locked_vehicle, patch.object(
order_service,
"_get_active_order_for_vehicle",
return_value=existing_order,
):
with self.assertRaises(HTTPException) as ctx:
await order_service.realizar_pedido(cpf="123.456.789-09", vehicle_id=8)
self.assertEqual(ctx.exception.status_code, 409)
self.assertEqual(ctx.exception.detail["code"], "vehicle_already_reserved")
locked_vehicle.assert_called_once_with(session, 8)
self.assertEqual(session.added, [])
self.assertTrue(session.closed)
async def test_realizar_pedido_uses_locked_vehicle_before_persisting_order(self):
vehicle = Vehicle(id=8, modelo="Toyota Corolla 2024", categoria="suv", preco=76087.0)
session = FakeSession(vehicle=vehicle)
fake_uuid = SimpleNamespace(hex="abc123def456")
fixed_now = datetime(2026, 3, 16, 17, 30, 0)
async def fake_hydrate_mock_customer_from_cpf(cpf: str, user_id: int | None = None):
return {"cpf": cpf, "user_id": user_id}
async def fake_validar_cliente_venda(cpf: str, valor_veiculo: float):
return {"aprovado": True}
with patch.object(order_service, "SessionMockLocal", return_value=session), patch.object(
order_service,
"hydrate_mock_customer_from_cpf",
new=fake_hydrate_mock_customer_from_cpf,
), patch.object(
order_service,
"validar_cliente_venda",
new=fake_validar_cliente_venda,
), patch.object(order_service, "_get_vehicle_for_update", return_value=vehicle) as locked_vehicle, patch.object(
order_service,
"_get_active_order_for_vehicle",
return_value=None,
), patch.object(order_service, "uuid4", return_value=fake_uuid), patch.object(order_service, "utc_now", return_value=fixed_now):
result = await order_service.realizar_pedido(cpf="123.456.789-09", vehicle_id=8)
locked_vehicle.assert_called_once_with(session, 8)
self.assertTrue(session.committed)
self.assertEqual(len(session.added), 1)
created_order = session.added[0]
self.assertEqual(created_order.vehicle_id, 8)
self.assertEqual(created_order.cpf, "12345678909")
self.assertEqual(result["numero_pedido"], "PED-20260316173000-ABC123")
self.assertEqual(result["vehicle_id"], 8)
self.assertEqual(result["status_veiculo"], "Reservado")
self.assertTrue(session.closed)
if __name__ == "__main__":
unittest.main()
Loading…
Cancel
Save