From c8cff5fc3fef1c9f270aedde2dc235611d044619 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vitor=20Hugo=20Belorio=20Sim=C3=A3o?= Date: Fri, 20 Mar 2026 11:19:21 -0300 Subject: [PATCH] =?UTF-8?q?=F0=9F=9B=A1=EF=B8=8F=20fix(concurrency):=20ser?= =?UTF-8?q?ializar=20chats=20e=20blindar=20conflitos=20de=20locacao?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Serializa o processamento do Telegram por chat com workers dedicados e semaforo global, evitando que uma mensagem lenta bloqueie os demais atendimentos enquanto preserva a ordem dentro de cada conversa. Protege a abertura de locacao com row lock no veiculo e adiciona lock de slot para agendamento e remarcacao de revisao, reduzindo o risco de corrida em reservas simultaneas. Amplia a cobertura com testes para paralelismo no satellite do Telegram, lock da locacao e lock dos horarios de revisao. --- .../telegram_satellite_service.py | 110 +++++++++++++-- app/services/domain/rental_service.py | 42 +++++- app/services/domain/review_service.py | 59 +++++++- tests/test_rental_service.py | 105 ++++++++++++++ tests/test_review_service.py | 133 ++++++++++++++++++ tests/test_telegram_multimodal.py | 74 ++++++++++ 6 files changed, 505 insertions(+), 18 deletions(-) create mode 100644 tests/test_review_service.py diff --git a/app/integrations/telegram_satellite_service.py b/app/integrations/telegram_satellite_service.py index da1001e..bb7da21 100644 --- a/app/integrations/telegram_satellite_service.py +++ b/app/integrations/telegram_satellite_service.py @@ -21,6 +21,7 @@ from app.services.user.user_service import UserService logger = logging.getLogger(__name__) TELEGRAM_MESSAGE_SAFE_LIMIT = 3800 +TELEGRAM_MAX_CONCURRENT_CHATS = 8 def _split_telegram_text(text: str, limit: int = TELEGRAM_MESSAGE_SAFE_LIMIT) -> List[str]: @@ -130,6 +131,10 @@ class TelegramSatelliteService: self.polling_timeout = settings.telegram_polling_timeout self.request_timeout = settings.telegram_request_timeout self._last_update_id = -1 + self._chat_queues: dict[int, asyncio.Queue[Dict[str, Any]]] = {} + self._chat_workers: dict[int, asyncio.Task[None]] = {} + self._chat_workers_lock = asyncio.Lock() + self._chat_processing_semaphore = asyncio.Semaphore(TELEGRAM_MAX_CONCURRENT_CHATS) async def run(self) -> None: """Inicia loop de long polling para consumir atualizacoes do bot.""" @@ -139,18 +144,101 @@ class TelegramSatelliteService: timeout = aiohttp.ClientTimeout(total=self.request_timeout) async with aiohttp.ClientSession(timeout=timeout) as session: - offset = await self._initialize_offset(session=session) + try: + offset = await self._initialize_offset(session=session) + while True: + updates = await self._get_updates(session=session, offset=offset) + for update in updates: + update_id = update.get("update_id") + if not isinstance(update_id, int): + continue + if update_id <= self._last_update_id: + continue + self._last_update_id = update_id + offset = update_id + 1 + await self._schedule_update_processing(session=session, update=update) + finally: + await self._shutdown_chat_workers() + + def _extract_chat_id(self, update: Dict[str, Any]) -> int | None: + message = update.get("message", {}) + chat = message.get("chat", {}) + chat_id = chat.get("id") + return chat_id if isinstance(chat_id, int) else None + + async def _schedule_update_processing( + self, + session: aiohttp.ClientSession, + update: Dict[str, Any], + ) -> None: + chat_id = self._extract_chat_id(update) + if chat_id is None: + async with self._chat_processing_semaphore: + await self._handle_update(session=session, update=update) + return + + async with self._chat_workers_lock: + queue = self._chat_queues.get(chat_id) + if queue is None: + queue = asyncio.Queue() + self._chat_queues[chat_id] = queue + queue.put_nowait(update) + + worker = self._chat_workers.get(chat_id) + if worker is None or worker.done(): + self._chat_workers[chat_id] = asyncio.create_task( + self._run_chat_worker( + chat_id=chat_id, + session=session, + queue=queue, + ) + ) + + async def _run_chat_worker( + self, + *, + chat_id: int, + session: aiohttp.ClientSession, + queue: asyncio.Queue[Dict[str, Any]], + ) -> None: + current_task = asyncio.current_task() + try: while True: - updates = await self._get_updates(session=session, offset=offset) - for update in updates: - update_id = update.get("update_id") - if not isinstance(update_id, int): - continue - if update_id <= self._last_update_id: - continue - self._last_update_id = update_id - offset = update_id + 1 - await self._handle_update(session=session, update=update) + update = await queue.get() + try: + async with self._chat_processing_semaphore: + await self._handle_update(session=session, update=update) + finally: + queue.task_done() + + async with self._chat_workers_lock: + if queue.empty(): + if self._chat_workers.get(chat_id) is current_task: + self._chat_workers.pop(chat_id, None) + if self._chat_queues.get(chat_id) is queue: + self._chat_queues.pop(chat_id, None) + return + except asyncio.CancelledError: + raise + except Exception: + logger.exception("Falha inesperada no worker do chat %s.", chat_id) + finally: + async with self._chat_workers_lock: + if self._chat_workers.get(chat_id) is current_task: + self._chat_workers.pop(chat_id, None) + if self._chat_queues.get(chat_id) is queue and queue.empty(): + self._chat_queues.pop(chat_id, None) + + async def _shutdown_chat_workers(self) -> None: + async with self._chat_workers_lock: + workers = list(self._chat_workers.values()) + self._chat_workers = {} + self._chat_queues = {} + + for worker in workers: + worker.cancel() + if workers: + await asyncio.gather(*workers, return_exceptions=True) async def _warmup_llm(self) -> None: """Preaquece o LLM no startup do satelite para reduzir latencia do primeiro usuario.""" diff --git a/app/services/domain/rental_service.py b/app/services/domain/rental_service.py index 7a20f60..dc9b6cd 100644 --- a/app/services/domain/rental_service.py +++ b/app/services/domain/rental_service.py @@ -132,18 +132,18 @@ def _calculate_rental_days(start: datetime, end: datetime) -> int: # Busca o veiculo da locacao por id ou placa normalizada. -def _lookup_rental_vehicle( +def _build_rental_vehicle_query( db, *, rental_vehicle_id: int | None = None, placa: str | None = None, -) -> RentalVehicle | None: +) -> Any: if rental_vehicle_id is not None: - return db.query(RentalVehicle).filter(RentalVehicle.id == rental_vehicle_id).first() + return db.query(RentalVehicle).filter(RentalVehicle.id == rental_vehicle_id) normalized_plate = technical_normalizer.normalize_plate(placa) if normalized_plate: - return db.query(RentalVehicle).filter(RentalVehicle.placa == normalized_plate).first() + return db.query(RentalVehicle).filter(RentalVehicle.placa == normalized_plate) raise_tool_http_error( status_code=400, @@ -152,8 +152,34 @@ def _lookup_rental_vehicle( retryable=True, field="placa", ) - return None + return db.query(RentalVehicle).filter(RentalVehicle.id.is_(None)) + + +def _lookup_rental_vehicle( + db, + *, + rental_vehicle_id: int | None = None, + placa: str | None = None, +) -> RentalVehicle | None: + return _build_rental_vehicle_query( + db, + rental_vehicle_id=rental_vehicle_id, + placa=placa, + ).first() + +# Recupera e trava o veiculo no mesmo turno transacional para evitar dupla locacao. +def _get_rental_vehicle_for_update( + db, + *, + rental_vehicle_id: int | None = None, + placa: str | None = None, +) -> RentalVehicle | None: + return _build_rental_vehicle_query( + db, + rental_vehicle_id=rental_vehicle_id, + placa=placa, + ).with_for_update().first() # Prioriza contratos do proprio usuario antes de cair para contratos sem dono. def _lookup_contract_by_user_preference(query, user_id: int | None): @@ -307,7 +333,11 @@ async def abrir_locacao_aluguel( db = SessionMockLocal() try: - vehicle = _lookup_rental_vehicle(db, rental_vehicle_id=vehicle_id, placa=placa) + vehicle = _get_rental_vehicle_for_update( + db, + rental_vehicle_id=vehicle_id, + placa=placa, + ) if vehicle is None: raise_tool_http_error( status_code=404, diff --git a/app/services/domain/review_service.py b/app/services/domain/review_service.py index 07cc757..ef0a210 100644 --- a/app/services/domain/review_service.py +++ b/app/services/domain/review_service.py @@ -4,7 +4,9 @@ from datetime import datetime, timedelta, timezone from typing import Any from fastapi import HTTPException -from sqlalchemy import func +from sqlalchemy import func, text + +from sqlalchemy.exc import OperationalError, SQLAlchemyError from app.db.mock_database import SessionMockLocal from app.db.mock_models import ReviewSchedule @@ -132,6 +134,49 @@ def _find_next_available_review_slot( return None +def _review_slot_lock_name(requested_dt: datetime) -> str: + return f"orquestrador:review_slot:{_normalize_review_slot(requested_dt).isoformat()}" + + +def _acquire_review_slot_lock( + db, + *, + requested_dt: datetime, + timeout_seconds: int = 5, + field_name: str = "data_hora", +) -> str | None: + lock_name = _review_slot_lock_name(requested_dt) + try: + acquired = db.execute( + text("SELECT GET_LOCK(:lock_name, :timeout_seconds)"), + {"lock_name": lock_name, "timeout_seconds": timeout_seconds}, + ).scalar() + except (OperationalError, SQLAlchemyError): + return None + + if int(acquired or 0) != 1: + raise_tool_http_error( + status_code=409, + code="review_slot_busy", + message="Outro atendimento esta finalizando este horario de revisao. Tente novamente.", + retryable=True, + field=field_name, + ) + return lock_name + + +def _release_review_slot_lock(db, lock_name: str | None) -> None: + if not lock_name: + return + try: + db.execute( + text("SELECT RELEASE_LOCK(:lock_name)"), + {"lock_name": lock_name}, + ) + except (OperationalError, SQLAlchemyError): + pass + + def build_review_conflict_detail( requested_dt: datetime, suggested_dt: datetime | None = None, @@ -223,7 +268,9 @@ async def agendar_revisao( protocolo = f"REV-{dt.strftime('%Y%m%d')}-{entropy}" db = SessionMockLocal() + review_slot_lock_name: str | None = None try: + review_slot_lock_name = _acquire_review_slot_lock(db, requested_dt=dt) conflito_horario = ( db.query(ReviewSchedule) .filter(ReviewSchedule.data_hora == dt) @@ -279,6 +326,7 @@ async def agendar_revisao( "valor_revisao": valor_revisao, } finally: + _release_review_slot_lock(db, review_slot_lock_name) db.close() @@ -413,6 +461,7 @@ async def editar_data_revisao( ) db = SessionMockLocal() + review_slot_lock_name: str | None = None try: agendamento = ( db.query(ReviewSchedule) @@ -437,6 +486,13 @@ async def editar_data_revisao( retryable=False, ) + if agendamento.data_hora != nova_data: + review_slot_lock_name = _acquire_review_slot_lock( + db, + requested_dt=nova_data, + field_name="nova_data_hora", + ) + conflito = ( db.query(ReviewSchedule) .filter(ReviewSchedule.id != agendamento.id) @@ -465,4 +521,5 @@ async def editar_data_revisao( "status": agendamento.status, } finally: + _release_review_slot_lock(db, review_slot_lock_name) db.close() diff --git a/tests/test_rental_service.py b/tests/test_rental_service.py index a72dbb3..2f5c630 100644 --- a/tests/test_rental_service.py +++ b/tests/test_rental_service.py @@ -1,7 +1,9 @@ import unittest from datetime import datetime +from types import SimpleNamespace from unittest.mock import patch +from fastapi import HTTPException from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker @@ -10,6 +12,48 @@ from app.db.mock_models import RentalContract, RentalFine, RentalPayment, Rental from app.services.domain import rental_service +class RentalLockingQuery: + def __init__(self, result): + self.result = result + self.with_for_update_called = False + + def filter(self, *args, **kwargs): + return self + + def with_for_update(self): + self.with_for_update_called = True + return self + + def first(self): + return self.result + + +class RentalLockingSession: + def __init__(self, vehicle=None): + self.vehicle = vehicle + self.query_instance = RentalLockingQuery(vehicle) + self.added = [] + self.committed = False + self.closed = False + self.refreshed = [] + + def query(self, model): + if model is rental_service.RentalVehicle: + return self.query_instance + raise AssertionError(f"unexpected model query: {model}") + + def add(self, item): + self.added.append(item) + + def commit(self): + self.committed = True + + def refresh(self, item): + self.refreshed.append(item) + + def close(self): + self.closed = True + class RentalServiceTests(unittest.IsolatedAsyncioTestCase): def _build_session_local(self): engine = create_engine("sqlite:///:memory:") @@ -151,6 +195,67 @@ class RentalServiceTests(unittest.IsolatedAsyncioTestCase): finally: db.close() + async def test_abrir_locacao_aluguel_uses_row_lock_before_reserving_vehicle(self): + vehicle = RentalVehicle( + id=8, + placa="ABC1D23", + modelo="Chevrolet Tracker", + categoria="suv", + ano=2024, + valor_diaria=219.9, + status="disponivel", + ) + session = RentalLockingSession(vehicle=vehicle) + fake_uuid = SimpleNamespace(hex="abc123def456") + fixed_now = datetime(2026, 3, 20, 9, 0) + + with patch.object(rental_service, "SessionMockLocal", return_value=session), patch.object( + rental_service, + "uuid4", + return_value=fake_uuid, + ), patch.object(rental_service, "utc_now", return_value=fixed_now): + result = await rental_service.abrir_locacao_aluguel( + rental_vehicle_id=8, + data_inicio="17/03/2026 10:00", + data_fim_prevista="20/03/2026 10:00", + ) + + self.assertTrue(session.query_instance.with_for_update_called) + self.assertTrue(session.committed) + self.assertEqual(len(session.added), 1) + self.assertEqual(session.added[0].rental_vehicle_id, 8) + self.assertEqual(vehicle.status, "alugado") + self.assertEqual(result["contrato_numero"], "LOC-20260320-ABC123DE") + self.assertEqual(result["status_veiculo"], "alugado") + self.assertTrue(session.closed) + + async def test_abrir_locacao_aluguel_returns_conflict_when_vehicle_status_is_already_rented_after_lock(self): + vehicle = RentalVehicle( + id=8, + placa="ABC1D23", + modelo="Chevrolet Tracker", + categoria="suv", + ano=2024, + valor_diaria=219.9, + status="alugado", + ) + session = RentalLockingSession(vehicle=vehicle) + + with patch.object(rental_service, "SessionMockLocal", return_value=session): + with self.assertRaises(HTTPException) as ctx: + await rental_service.abrir_locacao_aluguel( + rental_vehicle_id=8, + data_inicio="17/03/2026 10:00", + data_fim_prevista="20/03/2026 10:00", + ) + + self.assertTrue(session.query_instance.with_for_update_called) + self.assertEqual(ctx.exception.status_code, 409) + self.assertEqual(ctx.exception.detail["code"], "rental_vehicle_unavailable") + self.assertFalse(session.committed) + self.assertEqual(session.added, []) + self.assertTrue(session.closed) + async def test_registrar_devolucao_aluguel_fecha_contrato_e_libera_veiculo(self): SessionLocal = self._build_session_local() db = SessionLocal() diff --git a/tests/test_review_service.py b/tests/test_review_service.py new file mode 100644 index 0000000..cb4aaf7 --- /dev/null +++ b/tests/test_review_service.py @@ -0,0 +1,133 @@ +import unittest +from datetime import datetime +from types import SimpleNamespace +from unittest.mock import patch + +from fastapi import HTTPException + +from app.db.mock_models import ReviewSchedule +from app.services.domain import review_service + + +class ReviewLockingQuery: + def __init__(self, results=None): + self.results = list(results or []) + + def filter(self, *args, **kwargs): + return self + + def first(self): + if self.results: + return self.results.pop(0) + return None + + +class ReviewLockingSession: + def __init__(self, *, query_results=None, lock_acquired=1): + self.query_instance = ReviewLockingQuery(query_results) + self.lock_acquired = lock_acquired + self.execute_calls = [] + self.added = [] + self.committed = False + self.closed = False + self.refreshed = [] + + def query(self, model): + if model is review_service.ReviewSchedule: + return self.query_instance + raise AssertionError(f"unexpected model query: {model}") + + def execute(self, statement, params=None): + sql_text = str(statement) + self.execute_calls.append((sql_text, params)) + if "GET_LOCK" in sql_text: + return SimpleNamespace(scalar=lambda: self.lock_acquired) + if "RELEASE_LOCK" in sql_text: + return SimpleNamespace(scalar=lambda: 1) + raise AssertionError(f"unexpected execute call: {sql_text}") + + def add(self, item): + self.added.append(item) + + def commit(self): + self.committed = True + + def refresh(self, item): + self.refreshed.append(item) + + def close(self): + self.closed = True + + +class ReviewServiceLockingTests(unittest.IsolatedAsyncioTestCase): + def test_acquire_review_slot_lock_returns_conflict_when_slot_is_busy(self): + session = ReviewLockingSession(lock_acquired=0) + + with self.assertRaises(HTTPException) as ctx: + review_service._acquire_review_slot_lock( + session, + requested_dt=datetime(2026, 3, 18, 9, 0), + ) + + self.assertEqual(ctx.exception.status_code, 409) + self.assertEqual(ctx.exception.detail["code"], "review_slot_busy") + self.assertTrue(any("GET_LOCK" in sql for sql, _ in session.execute_calls)) + + async def test_agendar_revisao_uses_slot_lock_and_releases_after_success(self): + session = ReviewLockingSession(query_results=[None, None]) + + with patch.object(review_service, "SessionMockLocal", return_value=session): + result = await review_service.agendar_revisao( + placa="ABC1234", + data_hora="18/03/2026 09:00", + modelo="Onix", + ano=2022, + km=15000, + revisao_previa_concessionaria=False, + user_id=7, + ) + + self.assertTrue(any("GET_LOCK" in sql for sql, _ in session.execute_calls)) + self.assertTrue(any("RELEASE_LOCK" in sql for sql, _ in session.execute_calls)) + self.assertTrue(session.committed) + self.assertEqual(len(session.added), 1) + self.assertEqual(result["status"], "agendado") + self.assertTrue(session.closed) + + async def test_editar_data_revisao_releases_slot_lock_when_conflict_is_detected(self): + current_schedule = ReviewSchedule( + id=1, + protocolo="REV-20260318-AAAA1111", + user_id=7, + placa="ABC1234", + data_hora=datetime(2026, 3, 18, 9, 0), + status="agendado", + ) + conflicting_schedule = ReviewSchedule( + id=2, + protocolo="REV-20260319-BBBB2222", + user_id=8, + placa="XYZ9876", + data_hora=datetime(2026, 3, 19, 10, 0), + status="agendado", + ) + session = ReviewLockingSession(query_results=[current_schedule, conflicting_schedule]) + + with patch.object(review_service, "SessionMockLocal", return_value=session): + with self.assertRaises(HTTPException) as ctx: + await review_service.editar_data_revisao( + protocolo=current_schedule.protocolo, + nova_data_hora="19/03/2026 10:00", + user_id=7, + ) + + self.assertTrue(any("GET_LOCK" in sql for sql, _ in session.execute_calls)) + self.assertTrue(any("RELEASE_LOCK" in sql for sql, _ in session.execute_calls)) + self.assertEqual(ctx.exception.status_code, 409) + self.assertEqual(ctx.exception.detail["code"], "review_schedule_conflict") + self.assertFalse(session.committed) + self.assertTrue(session.closed) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_telegram_multimodal.py b/tests/test_telegram_multimodal.py index b837908..c4bfdad 100644 --- a/tests/test_telegram_multimodal.py +++ b/tests/test_telegram_multimodal.py @@ -1,4 +1,5 @@ import unittest +import asyncio from types import SimpleNamespace from unittest.mock import AsyncMock, patch @@ -11,8 +12,14 @@ class _DummySession: class TelegramMultimodalTests(unittest.IsolatedAsyncioTestCase): + async def asyncTearDown(self): + service = getattr(self, "_service_under_test", None) + if service is not None: + await service._shutdown_chat_workers() + async def test_process_message_uses_extracted_image_message(self): service = TelegramSatelliteService("token-teste") + self._service_under_test = service tools_db = _DummySession() mock_db = _DummySession() @@ -44,6 +51,7 @@ class TelegramMultimodalTests(unittest.IsolatedAsyncioTestCase): async def test_process_message_returns_direct_failure_for_unreadable_image(self): service = TelegramSatelliteService("token-teste") + self._service_under_test = service tools_db = _DummySession() mock_db = _DummySession() @@ -72,6 +80,7 @@ class TelegramMultimodalTests(unittest.IsolatedAsyncioTestCase): async def test_process_message_returns_direct_failure_for_receipt_without_watermark(self): service = TelegramSatelliteService("token-teste") + self._service_under_test = service tools_db = _DummySession() mock_db = _DummySession() @@ -97,3 +106,68 @@ class TelegramMultimodalTests(unittest.IsolatedAsyncioTestCase): self.assertIn("marca d'agua SysaltiIA visivel", answer) self.assertFalse(orchestrator_cls.return_value.handle_message.await_count) + + async def test_schedule_update_processing_allows_parallel_chats(self): + service = TelegramSatelliteService("token-teste") + self._service_under_test = service + release_first_chat = asyncio.Event() + chat_one_started = asyncio.Event() + started_chats: list[int] = [] + + async def fake_handle_update(*, session, update): + chat_id = update["message"]["chat"]["id"] + started_chats.append(chat_id) + if chat_id == 1: + chat_one_started.set() + await release_first_chat.wait() + + with patch.object(service, "_handle_update", new=fake_handle_update): + await service._schedule_update_processing( + session=SimpleNamespace(), + update={"update_id": 1, "message": {"chat": {"id": 1}, "text": "primeiro"}}, + ) + await chat_one_started.wait() + await service._schedule_update_processing( + session=SimpleNamespace(), + update={"update_id": 2, "message": {"chat": {"id": 2}, "text": "segundo"}}, + ) + await asyncio.sleep(0) + + self.assertEqual(started_chats, [1, 2]) + release_first_chat.set() + await asyncio.sleep(0) + + async def test_schedule_update_processing_preserves_order_per_chat(self): + service = TelegramSatelliteService("token-teste") + self._service_under_test = service + first_started = asyncio.Event() + allow_first_to_finish = asyncio.Event() + second_started = asyncio.Event() + started_updates: list[int] = [] + + async def fake_handle_update(*, session, update): + update_id = update["update_id"] + started_updates.append(update_id) + if update_id == 1: + first_started.set() + await allow_first_to_finish.wait() + return + second_started.set() + + with patch.object(service, "_handle_update", new=fake_handle_update): + await service._schedule_update_processing( + session=SimpleNamespace(), + update={"update_id": 1, "message": {"chat": {"id": 1}, "text": "primeiro"}}, + ) + await first_started.wait() + await service._schedule_update_processing( + session=SimpleNamespace(), + update={"update_id": 2, "message": {"chat": {"id": 1}, "text": "segundo"}}, + ) + await asyncio.sleep(0) + self.assertFalse(second_started.is_set()) + + allow_first_to_finish.set() + await asyncio.wait_for(second_started.wait(), timeout=1) + + self.assertEqual(started_updates, [1, 2])