You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
orquestrador/tests/test_rental_service.py

489 lines
18 KiB
Python

import asyncio
import threading
import time
import unittest
from datetime import datetime
from types import SimpleNamespace
from unittest.mock import patch
from fastapi import HTTPException
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import StaticPool
from app.db.mock_database import MockBase
from app.db.mock_models import RentalContract, RentalFine, RentalPayment, RentalVehicle
from app.services.domain import rental_service
class RentalLockingQuery:
def __init__(self, result):
self.result = result
self.with_for_update_called = False
def filter(self, *args, **kwargs):
return self
def with_for_update(self):
self.with_for_update_called = True
return self
def first(self):
return self.result
class RentalLockingSession:
def __init__(self, vehicle=None):
self.vehicle = vehicle
self.query_instance = RentalLockingQuery(vehicle)
self.added = []
self.committed = False
self.closed = False
self.refreshed = []
def query(self, model):
if model is rental_service.RentalVehicle:
return self.query_instance
raise AssertionError(f"unexpected model query: {model}")
def add(self, item):
self.added.append(item)
def commit(self):
self.committed = True
def refresh(self, item):
self.refreshed.append(item)
def close(self):
self.closed = True
class RentalServiceTests(unittest.IsolatedAsyncioTestCase):
def _build_session_local(self):
engine = create_engine("sqlite:///:memory:")
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
MockBase.metadata.create_all(bind=engine)
self.addCleanup(engine.dispose)
return SessionLocal
def _build_threadsafe_session_local(self):
engine = create_engine(
"sqlite://",
connect_args={"check_same_thread": False},
poolclass=StaticPool,
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
MockBase.metadata.create_all(bind=engine)
self.addCleanup(engine.dispose)
return SessionLocal
def _create_rental_vehicle(
self,
db,
*,
placa: str = "ABC1D23",
modelo: str = "Chevrolet Tracker",
categoria: str = "suv",
ano: int = 2024,
valor_diaria: float = 219.9,
status: str = "disponivel",
) -> RentalVehicle:
vehicle = RentalVehicle(
placa=placa,
modelo=modelo,
categoria=categoria,
ano=ano,
valor_diaria=valor_diaria,
status=status,
)
db.add(vehicle)
db.commit()
db.refresh(vehicle)
return vehicle
def _create_rental_contract(
self,
db,
vehicle: RentalVehicle,
*,
contrato_numero: str = "LOC-20260317-TESTE000",
user_id: int | None = None,
status: str = "ativa",
data_inicio: datetime | None = None,
data_fim_prevista: datetime | None = None,
) -> RentalContract:
contract = RentalContract(
contrato_numero=contrato_numero,
user_id=user_id,
rental_vehicle_id=vehicle.id,
placa=vehicle.placa,
modelo_veiculo=vehicle.modelo,
categoria=vehicle.categoria,
data_inicio=data_inicio or datetime(2026, 3, 17, 10, 0),
data_fim_prevista=data_fim_prevista or datetime(2026, 3, 20, 10, 0),
valor_diaria=float(vehicle.valor_diaria),
valor_previsto=round(float(vehicle.valor_diaria) * 3, 2),
status=status,
)
db.add(contract)
db.commit()
db.refresh(contract)
return contract
async def test_consultar_frota_aluguel_retains_only_available_by_default(self):
SessionLocal = self._build_session_local()
db = SessionLocal()
try:
self._create_rental_vehicle(db, placa="AAA1A11", modelo="Chevrolet Tracker", status="disponivel")
self._create_rental_vehicle(db, placa="BBB2B22", modelo="Fiat Toro", categoria="pickup", status="alugado")
self._create_rental_vehicle(db, placa="CCC3C33", modelo="Jeep Renegade", status="manutencao")
finally:
db.close()
with patch("app.services.domain.rental_service.SessionMockLocal", SessionLocal):
result = await rental_service.consultar_frota_aluguel(valor_diaria_max=300)
self.assertEqual(len(result), 1)
self.assertEqual(result[0]["placa"], "AAA1A11")
self.assertEqual(result[0]["status"], "disponivel")
async def test_consultar_frota_aluguel_filtra_por_modelo(self):
SessionLocal = self._build_session_local()
db = SessionLocal()
try:
self._create_rental_vehicle(db, placa="AAA1A11", modelo="Chevrolet Tracker")
self._create_rental_vehicle(db, placa="BBB2B22", modelo="Fiat Pulse")
finally:
db.close()
with patch("app.services.domain.rental_service.SessionMockLocal", SessionLocal):
result = await rental_service.consultar_frota_aluguel(modelo="tracker")
self.assertEqual(len(result), 1)
self.assertEqual(result[0]["modelo"], "Chevrolet Tracker")
async def test_consultar_frota_aluguel_randomiza_resultados_quando_solicitado(self):
SessionLocal = self._build_session_local()
db = SessionLocal()
try:
self._create_rental_vehicle(db, placa="AAA1A11", modelo="Chevrolet Tracker", valor_diaria=219.9)
self._create_rental_vehicle(db, placa="BBB2B22", modelo="Fiat Pulse", valor_diaria=189.9)
self._create_rental_vehicle(db, placa="CCC3C33", modelo="Renault Kwid", valor_diaria=119.9)
finally:
db.close()
with patch("app.services.domain.rental_service.SessionMockLocal", SessionLocal), patch(
"app.services.domain.rental_service.random.shuffle",
side_effect=lambda items: items.reverse(),
):
result = await rental_service.consultar_frota_aluguel(ordenar_diaria="random", limite=2)
self.assertEqual(len(result), 2)
self.assertEqual([item["placa"] for item in result], ["CCC3C33", "BBB2B22"])
async def test_abrir_locacao_aluguel_cria_contrato_e_marca_veiculo_como_alugado(self):
SessionLocal = self._build_session_local()
db = SessionLocal()
try:
vehicle = self._create_rental_vehicle(db)
vehicle_id = vehicle.id
vehicle_placa = vehicle.placa
finally:
db.close()
with patch("app.services.domain.rental_service.SessionMockLocal", SessionLocal):
result = await rental_service.abrir_locacao_aluguel(
placa=vehicle_placa,
data_inicio="17/03/2026 10:00",
data_fim_prevista="20/03/2026 10:00",
)
db = SessionLocal()
try:
stored_contract = db.query(RentalContract).one()
stored_vehicle = db.query(RentalVehicle).filter(RentalVehicle.id == vehicle_id).one()
self.assertEqual(stored_contract.placa, vehicle_placa)
self.assertEqual(stored_contract.status, "ativa")
self.assertEqual(stored_vehicle.status, "alugado")
self.assertEqual(result["status"], "ativa")
self.assertEqual(result["status_veiculo"], "alugado")
finally:
db.close()
async def test_abrir_locacao_aluguel_uses_row_lock_before_reserving_vehicle(self):
vehicle = RentalVehicle(
id=8,
placa="ABC1D23",
modelo="Chevrolet Tracker",
categoria="suv",
ano=2024,
valor_diaria=219.9,
status="disponivel",
)
session = RentalLockingSession(vehicle=vehicle)
fake_uuid = SimpleNamespace(hex="abc123def456")
fixed_now = datetime(2026, 3, 20, 9, 0)
with patch.object(rental_service, "SessionMockLocal", return_value=session), patch.object(
rental_service,
"uuid4",
return_value=fake_uuid,
), patch.object(rental_service, "utc_now", return_value=fixed_now):
result = await rental_service.abrir_locacao_aluguel(
rental_vehicle_id=8,
data_inicio="17/03/2026 10:00",
data_fim_prevista="20/03/2026 10:00",
)
self.assertTrue(session.query_instance.with_for_update_called)
self.assertTrue(session.committed)
self.assertEqual(len(session.added), 1)
self.assertEqual(session.added[0].rental_vehicle_id, 8)
self.assertEqual(vehicle.status, "alugado")
self.assertEqual(result["contrato_numero"], "LOC-20260320-ABC123DE")
self.assertEqual(result["status_veiculo"], "alugado")
self.assertTrue(session.closed)
async def test_abrir_locacao_aluguel_returns_conflict_when_vehicle_status_is_already_rented_after_lock(self):
vehicle = RentalVehicle(
id=8,
placa="ABC1D23",
modelo="Chevrolet Tracker",
categoria="suv",
ano=2024,
valor_diaria=219.9,
status="alugado",
)
session = RentalLockingSession(vehicle=vehicle)
with patch.object(rental_service, "SessionMockLocal", return_value=session):
with self.assertRaises(HTTPException) as ctx:
await rental_service.abrir_locacao_aluguel(
rental_vehicle_id=8,
data_inicio="17/03/2026 10:00",
data_fim_prevista="20/03/2026 10:00",
)
self.assertTrue(session.query_instance.with_for_update_called)
self.assertEqual(ctx.exception.status_code, 409)
self.assertEqual(ctx.exception.detail["code"], "rental_vehicle_unavailable")
self.assertFalse(session.committed)
self.assertEqual(session.added, [])
self.assertTrue(session.closed)
async def test_abrir_locacao_aluguel_allows_single_success_under_race(self):
SessionLocal = self._build_threadsafe_session_local()
db = SessionLocal()
try:
vehicle = self._create_rental_vehicle(db, placa="RAC1E01")
vehicle_id = vehicle.id
finally:
db.close()
attempts = 4
start_barrier = threading.Barrier(attempts)
vehicle_lock = threading.Lock()
def _get_locked_vehicle(db, *, rental_vehicle_id=None, placa=None):
acquired = vehicle_lock.acquire(timeout=2)
if not acquired:
raise AssertionError("failed to acquire rental race lock")
if not db.info.get("_test_rental_lock_wrapped"):
original_close = db.close
def close_with_lock_release():
try:
original_close()
finally:
held_lock = db.info.pop("_test_rental_vehicle_lock", None)
if held_lock and held_lock.locked():
held_lock.release()
db.close = close_with_lock_release
db.info["_test_rental_lock_wrapped"] = True
db.info["_test_rental_vehicle_lock"] = vehicle_lock
time.sleep(0.05)
return rental_service._build_rental_vehicle_query(
db,
rental_vehicle_id=rental_vehicle_id,
placa=placa,
).first()
def _sync_open_rental():
start_barrier.wait(timeout=5)
return asyncio.run(
rental_service.abrir_locacao_aluguel(
rental_vehicle_id=vehicle_id,
data_inicio="17/03/2026 10:00",
data_fim_prevista="20/03/2026 10:00",
)
)
with patch.object(rental_service, "SessionMockLocal", SessionLocal), patch.object(
rental_service,
"_get_rental_vehicle_for_update",
side_effect=_get_locked_vehicle,
):
results = await asyncio.gather(
*[asyncio.to_thread(_sync_open_rental) for _ in range(attempts)],
return_exceptions=True,
)
successes = [result for result in results if isinstance(result, dict)]
conflicts = [
result
for result in results
if isinstance(result, HTTPException)
and isinstance(result.detail, dict)
and result.detail.get("code") == "rental_vehicle_unavailable"
]
unexpected = [result for result in results if result not in successes and result not in conflicts]
self.assertEqual(len(successes), 1)
self.assertEqual(len(conflicts), attempts - 1)
self.assertEqual(unexpected, [])
db = SessionLocal()
try:
contracts = db.query(RentalContract).all()
stored_vehicle = db.query(RentalVehicle).filter(RentalVehicle.id == vehicle_id).one()
self.assertEqual(len(contracts), 1)
self.assertEqual(stored_vehicle.status, "alugado")
finally:
db.close()
async def test_registrar_devolucao_aluguel_fecha_contrato_e_libera_veiculo(self):
SessionLocal = self._build_session_local()
db = SessionLocal()
try:
vehicle = self._create_rental_vehicle(db, status="alugado")
vehicle_id = vehicle.id
vehicle_diaria = float(vehicle.valor_diaria)
contract = self._create_rental_contract(db, vehicle)
contract_number = contract.contrato_numero
finally:
db.close()
with patch("app.services.domain.rental_service.SessionMockLocal", SessionLocal):
result = await rental_service.registrar_devolucao_aluguel(
contrato_numero=contract_number,
data_devolucao="21/03/2026 09:00",
)
db = SessionLocal()
try:
stored_contract = db.query(RentalContract).one()
stored_vehicle = db.query(RentalVehicle).filter(RentalVehicle.id == vehicle_id).one()
self.assertEqual(stored_contract.status, "encerrada")
self.assertEqual(stored_vehicle.status, "disponivel")
self.assertEqual(result["status"], "encerrada")
self.assertEqual(result["status_veiculo"], "disponivel")
self.assertEqual(result["valor_final"], round(vehicle_diaria * 4, 2))
finally:
db.close()
async def test_registrar_pagamento_aluguel_persiste_registro(self):
SessionLocal = self._build_session_local()
with patch("app.services.domain.rental_service.SessionMockLocal", SessionLocal):
result = await rental_service.registrar_pagamento_aluguel(
contrato_numero="loc-123",
placa="abc1234",
valor=1540.5,
data_pagamento="17/03/2026 14:30",
favorecido="Locadora XPTO",
identificador_comprovante="NSU123",
user_id=9,
)
db = SessionLocal()
try:
stored = db.query(RentalPayment).one()
self.assertEqual(stored.contrato_numero, "LOC-123")
self.assertEqual(stored.placa, "ABC1234")
self.assertEqual(float(stored.valor), 1540.5)
self.assertEqual(result["status"], "registrado")
finally:
db.close()
async def test_registrar_pagamento_aluguel_vincula_unica_locacao_ativa_do_usuario(self):
SessionLocal = self._build_session_local()
db = SessionLocal()
try:
vehicle = self._create_rental_vehicle(db, status="alugado")
vehicle_placa = vehicle.placa
contract = self._create_rental_contract(db, vehicle, user_id=9)
finally:
db.close()
with patch("app.services.domain.rental_service.SessionMockLocal", SessionLocal):
result = await rental_service.registrar_pagamento_aluguel(
valor=879.90,
user_id=9,
)
db = SessionLocal()
try:
stored = db.query(RentalPayment).one()
self.assertEqual(stored.rental_contract_id, contract.id)
self.assertEqual(stored.contrato_numero, contract.contrato_numero)
self.assertEqual(stored.placa, vehicle_placa)
self.assertEqual(result["contrato_numero"], contract.contrato_numero)
finally:
db.close()
async def test_registrar_multa_aluguel_persiste_registro(self):
SessionLocal = self._build_session_local()
with patch("app.services.domain.rental_service.SessionMockLocal", SessionLocal):
result = await rental_service.registrar_multa_aluguel(
placa="abc1d23",
auto_infracao="A123456",
valor=293.47,
data_infracao="17/03/2026",
vencimento="10/04/2026",
orgao_emissor="DETRAN-SP",
user_id=11,
)
db = SessionLocal()
try:
stored = db.query(RentalFine).one()
self.assertEqual(stored.placa, "ABC1D23")
self.assertEqual(stored.auto_infracao, "A123456")
self.assertEqual(result["status"], "registrada")
finally:
db.close()
async def test_registrar_multa_aluguel_vincula_contrato_ativo_pela_placa(self):
SessionLocal = self._build_session_local()
db = SessionLocal()
try:
vehicle = self._create_rental_vehicle(db, placa="ABC1D23", status="alugado")
contract = self._create_rental_contract(db, vehicle, user_id=11)
finally:
db.close()
with patch("app.services.domain.rental_service.SessionMockLocal", SessionLocal):
result = await rental_service.registrar_multa_aluguel(
placa="ABC1D23",
auto_infracao="A123456",
valor=293.47,
user_id=11,
)
db = SessionLocal()
try:
stored = db.query(RentalFine).one()
self.assertEqual(stored.rental_contract_id, contract.id)
self.assertEqual(stored.contrato_numero, contract.contrato_numero)
self.assertEqual(result["contrato_numero"], contract.contrato_numero)
finally:
db.close()
if __name__ == "__main__":
unittest.main()