import unittest from datetime import datetime, timedelta, timezone from admin_app.core import AdminSecurityService, AdminSettings from admin_app.db.models import AuditLog, StaffAccount, StaffSession from admin_app.services import AuditService from admin_app.services.auth_service import AuthService from shared.contracts import StaffRole class _FakeStaffAccountRepository: def __init__(self, account: StaffAccount | None): self.account = account def get_by_email(self, email: str) -> StaffAccount | None: if self.account and self.account.email == email: return self.account return None def get_by_id(self, staff_account_id: int) -> StaffAccount | None: if self.account and self.account.id == staff_account_id: return self.account return None def update_last_login(self, staff_account: StaffAccount) -> StaffAccount: self.account = staff_account return staff_account class _FakeStaffSessionRepository: def __init__(self): self.sessions: dict[int, StaffSession] = {} self._next_id = 1 def create(self, *, staff_account_id: int, refresh_token_hash: str, expires_at: datetime, ip_address: str | None, user_agent: str | None) -> StaffSession: session = StaffSession( id=self._next_id, staff_account_id=staff_account_id, refresh_token_hash=refresh_token_hash, expires_at=expires_at, ip_address=ip_address, user_agent=user_agent, ) self.sessions[session.id] = session self._next_id += 1 return session def get_by_id(self, session_id: int) -> StaffSession | None: return self.sessions.get(session_id) def get_by_refresh_token_hash(self, refresh_token_hash: str) -> StaffSession | None: for session in self.sessions.values(): if session.refresh_token_hash == refresh_token_hash: return session return None def save(self, staff_session: StaffSession) -> StaffSession: self.sessions[staff_session.id] = staff_session return staff_session class _FakeAuditLogRepository: def __init__(self): self.entries: list[AuditLog] = [] self._next_id = 1 def create(self, **kwargs) -> AuditLog: audit_log = AuditLog(id=self._next_id, **kwargs) self.entries.append(audit_log) self._next_id += 1 return audit_log def list_recent(self, limit: int = 50) -> list[AuditLog]: return list(reversed(self.entries))[:limit] class AdminAuthServiceTests(unittest.TestCase): def setUp(self): self.security_service = AdminSecurityService( AdminSettings( admin_auth_token_secret="test-secret", admin_auth_password_pepper="pepper", ) ) self.account = StaffAccount( id=1, email="admin@empresa.com", display_name="Administrador", password_hash=self.security_service.hash_password("SenhaMuitoSegura!123"), role=StaffRole.DIRETOR, is_active=True, ) self.account_repository = _FakeStaffAccountRepository(self.account) self.session_repository = _FakeStaffSessionRepository() self.audit_repository = _FakeAuditLogRepository() self.audit_service = AuditService(self.audit_repository) self.auth_service = AuthService( account_repository=self.account_repository, session_repository=self.session_repository, security_service=self.security_service, audit_service=self.audit_service, ) def test_login_creates_authenticated_session_with_refresh_token(self): session = self.auth_service.login( email="admin@empresa.com", password="SenhaMuitoSegura!123", ip_address="127.0.0.1", user_agent="unittest", ) self.assertIsNotNone(session) self.assertEqual(session.session_id, 1) self.assertTrue(session.access_token) self.assertTrue(session.refresh_token) self.assertEqual(session.principal.role, StaffRole.DIRETOR) self.assertEqual(self.audit_repository.entries[-1].event_type, "staff.login.succeeded") def test_login_failure_creates_audit_entry(self): session = self.auth_service.login( email="admin@empresa.com", password="senha-incorreta", ip_address="127.0.0.1", user_agent="unittest", ) self.assertIsNone(session) self.assertEqual(self.audit_repository.entries[-1].event_type, "staff.login.failed") self.assertEqual(self.audit_repository.entries[-1].outcome, "failed") def test_refresh_session_rotates_refresh_token(self): session = self.auth_service.login( email="admin@empresa.com", password="SenhaMuitoSegura!123", ip_address="127.0.0.1", user_agent="unittest", ) refreshed = self.auth_service.refresh_session( refresh_token=session.refresh_token, ip_address="127.0.0.1", user_agent="unittest-refresh", ) self.assertIsNotNone(refreshed) self.assertEqual(refreshed.session_id, session.session_id) self.assertNotEqual(refreshed.refresh_token, session.refresh_token) def test_logout_revokes_session_and_creates_audit_entry(self): session = self.auth_service.login( email="admin@empresa.com", password="SenhaMuitoSegura!123", ip_address="127.0.0.1", user_agent="unittest", ) self.assertTrue( self.auth_service.logout( session.session_id, actor_staff_account_id=self.account.id, ip_address="127.0.0.1", user_agent="unittest", ) ) self.assertIsNotNone(self.session_repository.get_by_id(session.session_id).revoked_at) self.assertEqual(self.audit_repository.entries[-1].event_type, "staff.logout.succeeded") def test_get_authenticated_context_rejects_revoked_session(self): session = self.auth_service.login( email="admin@empresa.com", password="SenhaMuitoSegura!123", ip_address="127.0.0.1", user_agent="unittest", ) self.auth_service.logout( session.session_id, actor_staff_account_id=self.account.id, ip_address="127.0.0.1", user_agent="unittest", ) with self.assertRaises(ValueError): self.auth_service.get_authenticated_context(session.access_token) def test_get_authenticated_context_accepts_naive_session_expiry_from_database(self): session = self.auth_service.login( email="admin@empresa.com", password="SenhaMuitoSegura!123", ip_address="127.0.0.1", user_agent="unittest", ) stored_session = self.session_repository.get_by_id(session.session_id) stored_session.expires_at = stored_session.expires_at.replace(tzinfo=None) self.session_repository.save(stored_session) context = self.auth_service.get_authenticated_context(session.access_token) self.assertEqual(context.session_id, session.session_id) self.assertEqual(context.principal.email, "admin@empresa.com") def test_refresh_session_rejects_expired_session(self): session = self.auth_service.login( email="admin@empresa.com", password="SenhaMuitoSegura!123", ip_address="127.0.0.1", user_agent="unittest", ) stored_session = self.session_repository.get_by_id(session.session_id) stored_session.expires_at = datetime.now(timezone.utc) - timedelta(minutes=1) self.session_repository.save(stored_session) refreshed = self.auth_service.refresh_session( refresh_token=session.refresh_token, ip_address="127.0.0.1", user_agent="unittest-refresh", ) self.assertIsNone(refreshed) if __name__ == "__main__": unittest.main()