import asyncio import threading import time import unittest from datetime import datetime from types import SimpleNamespace from unittest.mock import patch from fastapi import HTTPException from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from sqlalchemy.pool import StaticPool from app.db.mock_database import MockBase from app.db.mock_models import RentalContract, RentalPayment, RentalVehicle from app.services.domain import rental_service class RentalLockingQuery: def __init__(self, result): self.result = result self.with_for_update_called = False def filter(self, *args, **kwargs): return self def with_for_update(self): self.with_for_update_called = True return self def first(self): return self.result class RentalLockingSession: def __init__(self, vehicle=None): self.vehicle = vehicle self.query_instance = RentalLockingQuery(vehicle) self.added = [] self.committed = False self.closed = False self.refreshed = [] def query(self, model): if model is rental_service.RentalVehicle: return self.query_instance raise AssertionError(f"unexpected model query: {model}") def add(self, item): self.added.append(item) def commit(self): self.committed = True def refresh(self, item): self.refreshed.append(item) def close(self): self.closed = True class RentalServiceTests(unittest.IsolatedAsyncioTestCase): def _build_session_local(self): engine = create_engine("sqlite:///:memory:") SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) MockBase.metadata.create_all(bind=engine) self.addCleanup(engine.dispose) return SessionLocal def _build_threadsafe_session_local(self): engine = create_engine( "sqlite://", connect_args={"check_same_thread": False}, poolclass=StaticPool, ) SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) MockBase.metadata.create_all(bind=engine) self.addCleanup(engine.dispose) return SessionLocal def _create_rental_vehicle( self, db, *, placa: str = "ABC1D23", modelo: str = "Chevrolet Tracker", categoria: str = "suv", ano: int = 2024, valor_diaria: float = 219.9, status: str = "disponivel", ) -> RentalVehicle: vehicle = RentalVehicle( placa=placa, modelo=modelo, categoria=categoria, ano=ano, valor_diaria=valor_diaria, status=status, ) db.add(vehicle) db.commit() db.refresh(vehicle) return vehicle def _create_rental_contract( self, db, vehicle: RentalVehicle, *, contrato_numero: str = "LOC-20260317-TESTE000", user_id: int | None = None, status: str = "ativa", data_inicio: datetime | None = None, data_fim_prevista: datetime | None = None, ) -> RentalContract: contract = RentalContract( contrato_numero=contrato_numero, user_id=user_id, rental_vehicle_id=vehicle.id, placa=vehicle.placa, modelo_veiculo=vehicle.modelo, categoria=vehicle.categoria, data_inicio=data_inicio or datetime(2026, 3, 17, 10, 0), data_fim_prevista=data_fim_prevista or datetime(2026, 3, 20, 10, 0), valor_diaria=float(vehicle.valor_diaria), valor_previsto=round(float(vehicle.valor_diaria) * 3, 2), status=status, ) db.add(contract) db.commit() db.refresh(contract) return contract async def test_consultar_frota_aluguel_retains_only_available_by_default(self): SessionLocal = self._build_session_local() db = SessionLocal() try: self._create_rental_vehicle(db, placa="AAA1A11", modelo="Chevrolet Tracker", status="disponivel") self._create_rental_vehicle(db, placa="BBB2B22", modelo="Fiat Toro", categoria="pickup", status="alugado") self._create_rental_vehicle(db, placa="CCC3C33", modelo="Jeep Renegade", status="manutencao") finally: db.close() with patch("app.services.domain.rental_service.SessionMockLocal", SessionLocal): result = await rental_service.consultar_frota_aluguel(valor_diaria_max=300) self.assertEqual(len(result), 1) self.assertEqual(result[0]["placa"], "AAA1A11") self.assertEqual(result[0]["status"], "disponivel") async def test_consultar_frota_aluguel_filtra_por_modelo(self): SessionLocal = self._build_session_local() db = SessionLocal() try: self._create_rental_vehicle(db, placa="AAA1A11", modelo="Chevrolet Tracker") self._create_rental_vehicle(db, placa="BBB2B22", modelo="Fiat Pulse") finally: db.close() with patch("app.services.domain.rental_service.SessionMockLocal", SessionLocal): result = await rental_service.consultar_frota_aluguel(modelo="tracker") self.assertEqual(len(result), 1) self.assertEqual(result[0]["modelo"], "Chevrolet Tracker") async def test_consultar_frota_aluguel_randomiza_resultados_quando_solicitado(self): SessionLocal = self._build_session_local() db = SessionLocal() try: self._create_rental_vehicle(db, placa="AAA1A11", modelo="Chevrolet Tracker", valor_diaria=219.9) self._create_rental_vehicle(db, placa="BBB2B22", modelo="Fiat Pulse", valor_diaria=189.9) self._create_rental_vehicle(db, placa="CCC3C33", modelo="Renault Kwid", valor_diaria=119.9) finally: db.close() with patch("app.services.domain.rental_service.SessionMockLocal", SessionLocal), patch( "app.services.domain.rental_service.random.shuffle", side_effect=lambda items: items.reverse(), ): result = await rental_service.consultar_frota_aluguel(ordenar_diaria="random", limite=2) self.assertEqual(len(result), 2) self.assertEqual([item["placa"] for item in result], ["CCC3C33", "BBB2B22"]) async def test_abrir_locacao_aluguel_cria_contrato_e_marca_veiculo_como_alugado(self): SessionLocal = self._build_session_local() db = SessionLocal() try: vehicle = self._create_rental_vehicle(db) vehicle_id = vehicle.id vehicle_placa = vehicle.placa finally: db.close() with patch("app.services.domain.rental_service.SessionMockLocal", SessionLocal): result = await rental_service.abrir_locacao_aluguel( placa=vehicle_placa, data_inicio="17/03/2026 10:00", data_fim_prevista="20/03/2026 10:00", ) db = SessionLocal() try: stored_contract = db.query(RentalContract).one() stored_vehicle = db.query(RentalVehicle).filter(RentalVehicle.id == vehicle_id).one() self.assertEqual(stored_contract.placa, vehicle_placa) self.assertEqual(stored_contract.status, "ativa") self.assertEqual(stored_vehicle.status, "alugado") self.assertEqual(result["status"], "ativa") self.assertEqual(result["status_veiculo"], "alugado") finally: db.close() async def test_abrir_locacao_aluguel_uses_row_lock_before_reserving_vehicle(self): vehicle = RentalVehicle( id=8, placa="ABC1D23", modelo="Chevrolet Tracker", categoria="suv", ano=2024, valor_diaria=219.9, status="disponivel", ) session = RentalLockingSession(vehicle=vehicle) fake_uuid = SimpleNamespace(hex="abc123def456") fixed_now = datetime(2026, 3, 20, 9, 0) with patch.object(rental_service, "SessionMockLocal", return_value=session), patch.object( rental_service, "uuid4", return_value=fake_uuid, ), patch.object(rental_service, "utc_now", return_value=fixed_now): result = await rental_service.abrir_locacao_aluguel( rental_vehicle_id=8, data_inicio="17/03/2026 10:00", data_fim_prevista="20/03/2026 10:00", ) self.assertTrue(session.query_instance.with_for_update_called) self.assertTrue(session.committed) self.assertEqual(len(session.added), 1) self.assertEqual(session.added[0].rental_vehicle_id, 8) self.assertEqual(vehicle.status, "alugado") self.assertEqual(result["contrato_numero"], "LOC-20260320-ABC123DE") self.assertEqual(result["status_veiculo"], "alugado") self.assertTrue(session.closed) async def test_abrir_locacao_aluguel_returns_conflict_when_vehicle_status_is_already_rented_after_lock(self): vehicle = RentalVehicle( id=8, placa="ABC1D23", modelo="Chevrolet Tracker", categoria="suv", ano=2024, valor_diaria=219.9, status="alugado", ) session = RentalLockingSession(vehicle=vehicle) with patch.object(rental_service, "SessionMockLocal", return_value=session): with self.assertRaises(HTTPException) as ctx: await rental_service.abrir_locacao_aluguel( rental_vehicle_id=8, data_inicio="17/03/2026 10:00", data_fim_prevista="20/03/2026 10:00", ) self.assertTrue(session.query_instance.with_for_update_called) self.assertEqual(ctx.exception.status_code, 409) self.assertEqual(ctx.exception.detail["code"], "rental_vehicle_unavailable") self.assertFalse(session.committed) self.assertEqual(session.added, []) self.assertTrue(session.closed) async def test_abrir_locacao_aluguel_allows_single_success_under_race(self): SessionLocal = self._build_threadsafe_session_local() db = SessionLocal() try: vehicle = self._create_rental_vehicle(db, placa="RAC1E01") vehicle_id = vehicle.id finally: db.close() attempts = 4 start_barrier = threading.Barrier(attempts) vehicle_lock = threading.Lock() def _get_locked_vehicle(db, *, rental_vehicle_id=None, placa=None): acquired = vehicle_lock.acquire(timeout=2) if not acquired: raise AssertionError("failed to acquire rental race lock") if not db.info.get("_test_rental_lock_wrapped"): original_close = db.close def close_with_lock_release(): try: original_close() finally: held_lock = db.info.pop("_test_rental_vehicle_lock", None) if held_lock and held_lock.locked(): held_lock.release() db.close = close_with_lock_release db.info["_test_rental_lock_wrapped"] = True db.info["_test_rental_vehicle_lock"] = vehicle_lock time.sleep(0.05) return rental_service._build_rental_vehicle_query( db, rental_vehicle_id=rental_vehicle_id, placa=placa, ).first() def _sync_open_rental(): start_barrier.wait(timeout=5) return asyncio.run( rental_service.abrir_locacao_aluguel( rental_vehicle_id=vehicle_id, data_inicio="17/03/2026 10:00", data_fim_prevista="20/03/2026 10:00", ) ) with patch.object(rental_service, "SessionMockLocal", SessionLocal), patch.object( rental_service, "_get_rental_vehicle_for_update", side_effect=_get_locked_vehicle, ): results = await asyncio.gather( *[asyncio.to_thread(_sync_open_rental) for _ in range(attempts)], return_exceptions=True, ) successes = [result for result in results if isinstance(result, dict)] conflicts = [ result for result in results if isinstance(result, HTTPException) and isinstance(result.detail, dict) and result.detail.get("code") == "rental_vehicle_unavailable" ] unexpected = [result for result in results if result not in successes and result not in conflicts] self.assertEqual(len(successes), 1) self.assertEqual(len(conflicts), attempts - 1) self.assertEqual(unexpected, []) db = SessionLocal() try: contracts = db.query(RentalContract).all() stored_vehicle = db.query(RentalVehicle).filter(RentalVehicle.id == vehicle_id).one() self.assertEqual(len(contracts), 1) self.assertEqual(stored_vehicle.status, "alugado") finally: db.close() async def test_registrar_devolucao_aluguel_fecha_contrato_e_libera_veiculo(self): SessionLocal = self._build_session_local() db = SessionLocal() try: vehicle = self._create_rental_vehicle(db, status="alugado") vehicle_id = vehicle.id vehicle_diaria = float(vehicle.valor_diaria) contract = self._create_rental_contract(db, vehicle) contract_number = contract.contrato_numero finally: db.close() with patch("app.services.domain.rental_service.SessionMockLocal", SessionLocal): result = await rental_service.registrar_devolucao_aluguel( contrato_numero=contract_number, data_devolucao="21/03/2026 09:00", ) db = SessionLocal() try: stored_contract = db.query(RentalContract).one() stored_vehicle = db.query(RentalVehicle).filter(RentalVehicle.id == vehicle_id).one() self.assertEqual(stored_contract.status, "encerrada") self.assertEqual(stored_vehicle.status, "disponivel") self.assertEqual(result["status"], "encerrada") self.assertEqual(result["status_veiculo"], "disponivel") self.assertEqual(result["valor_final"], round(vehicle_diaria * 4, 2)) finally: db.close() async def test_registrar_pagamento_aluguel_persiste_registro(self): SessionLocal = self._build_session_local() with patch("app.services.domain.rental_service.SessionMockLocal", SessionLocal): result = await rental_service.registrar_pagamento_aluguel( contrato_numero="loc-123", placa="abc1234", valor=1540.5, data_pagamento="17/03/2026 14:30", favorecido="Locadora XPTO", identificador_comprovante="NSU123", user_id=9, ) db = SessionLocal() try: stored = db.query(RentalPayment).one() self.assertEqual(stored.contrato_numero, "LOC-123") self.assertEqual(stored.placa, "ABC1234") self.assertEqual(float(stored.valor), 1540.5) self.assertEqual(result["status"], "registrado") finally: db.close() async def test_registrar_pagamento_aluguel_vincula_unica_locacao_ativa_do_usuario(self): SessionLocal = self._build_session_local() db = SessionLocal() try: vehicle = self._create_rental_vehicle(db, status="alugado") vehicle_placa = vehicle.placa contract = self._create_rental_contract(db, vehicle, user_id=9) finally: db.close() with patch("app.services.domain.rental_service.SessionMockLocal", SessionLocal): result = await rental_service.registrar_pagamento_aluguel( valor=879.90, user_id=9, ) db = SessionLocal() try: stored = db.query(RentalPayment).one() self.assertEqual(stored.rental_contract_id, contract.id) self.assertEqual(stored.contrato_numero, contract.contrato_numero) self.assertEqual(stored.placa, vehicle_placa) self.assertEqual(result["contrato_numero"], contract.contrato_numero) finally: db.close() if __name__ == "__main__": unittest.main()