You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
orquestrador/tests/test_review_service.py

134 lines
4.6 KiB
Python

import unittest
from datetime import datetime
from types import SimpleNamespace
from unittest.mock import patch
from fastapi import HTTPException
from app.db.mock_models import ReviewSchedule
from app.services.domain import review_service
class ReviewLockingQuery:
def __init__(self, results=None):
self.results = list(results or [])
def filter(self, *args, **kwargs):
return self
def first(self):
if self.results:
return self.results.pop(0)
return None
class ReviewLockingSession:
def __init__(self, *, query_results=None, lock_acquired=1):
self.query_instance = ReviewLockingQuery(query_results)
self.lock_acquired = lock_acquired
self.execute_calls = []
self.added = []
self.committed = False
self.closed = False
self.refreshed = []
def query(self, model):
if model is review_service.ReviewSchedule:
return self.query_instance
raise AssertionError(f"unexpected model query: {model}")
def execute(self, statement, params=None):
sql_text = str(statement)
self.execute_calls.append((sql_text, params))
if "GET_LOCK" in sql_text:
return SimpleNamespace(scalar=lambda: self.lock_acquired)
if "RELEASE_LOCK" in sql_text:
return SimpleNamespace(scalar=lambda: 1)
raise AssertionError(f"unexpected execute call: {sql_text}")
def add(self, item):
self.added.append(item)
def commit(self):
self.committed = True
def refresh(self, item):
self.refreshed.append(item)
def close(self):
self.closed = True
class ReviewServiceLockingTests(unittest.IsolatedAsyncioTestCase):
def test_acquire_review_slot_lock_returns_conflict_when_slot_is_busy(self):
session = ReviewLockingSession(lock_acquired=0)
with self.assertRaises(HTTPException) as ctx:
review_service._acquire_review_slot_lock(
session,
requested_dt=datetime(2026, 3, 18, 9, 0),
)
self.assertEqual(ctx.exception.status_code, 409)
self.assertEqual(ctx.exception.detail["code"], "review_slot_busy")
self.assertTrue(any("GET_LOCK" in sql for sql, _ in session.execute_calls))
async def test_agendar_revisao_uses_slot_lock_and_releases_after_success(self):
session = ReviewLockingSession(query_results=[None, None])
with patch.object(review_service, "SessionMockLocal", return_value=session):
result = await review_service.agendar_revisao(
placa="ABC1234",
data_hora="18/03/2026 09:00",
modelo="Onix",
ano=2022,
km=15000,
revisao_previa_concessionaria=False,
user_id=7,
)
self.assertTrue(any("GET_LOCK" in sql for sql, _ in session.execute_calls))
self.assertTrue(any("RELEASE_LOCK" in sql for sql, _ in session.execute_calls))
self.assertTrue(session.committed)
self.assertEqual(len(session.added), 1)
self.assertEqual(result["status"], "agendado")
self.assertTrue(session.closed)
async def test_editar_data_revisao_releases_slot_lock_when_conflict_is_detected(self):
current_schedule = ReviewSchedule(
id=1,
protocolo="REV-20260318-AAAA1111",
user_id=7,
placa="ABC1234",
data_hora=datetime(2026, 3, 18, 9, 0),
status="agendado",
)
conflicting_schedule = ReviewSchedule(
id=2,
protocolo="REV-20260319-BBBB2222",
user_id=8,
placa="XYZ9876",
data_hora=datetime(2026, 3, 19, 10, 0),
status="agendado",
)
session = ReviewLockingSession(query_results=[current_schedule, conflicting_schedule])
with patch.object(review_service, "SessionMockLocal", return_value=session):
with self.assertRaises(HTTPException) as ctx:
await review_service.editar_data_revisao(
protocolo=current_schedule.protocolo,
nova_data_hora="19/03/2026 10:00",
user_id=7,
)
self.assertTrue(any("GET_LOCK" in sql for sql, _ in session.execute_calls))
self.assertTrue(any("RELEASE_LOCK" in sql for sql, _ in session.execute_calls))
self.assertEqual(ctx.exception.status_code, 409)
self.assertEqual(ctx.exception.detail["code"], "review_schedule_conflict")
self.assertFalse(session.committed)
self.assertTrue(session.closed)
if __name__ == "__main__":
unittest.main()