You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
orquestrador/tests/test_admin_auth_service.py

223 lines
8.1 KiB
Python

import unittest
from datetime import datetime, timedelta, timezone
from admin_app.core import AdminSecurityService, AdminSettings
from admin_app.db.models import AuditLog, StaffAccount, StaffSession
from admin_app.services import AuditService
from admin_app.services.auth_service import AuthService
from shared.contracts import StaffRole
class _FakeStaffAccountRepository:
def __init__(self, account: StaffAccount | None):
self.account = account
def get_by_email(self, email: str) -> StaffAccount | None:
if self.account and self.account.email == email:
return self.account
return None
def get_by_id(self, staff_account_id: int) -> StaffAccount | None:
if self.account and self.account.id == staff_account_id:
return self.account
return None
def update_last_login(self, staff_account: StaffAccount) -> StaffAccount:
self.account = staff_account
return staff_account
class _FakeStaffSessionRepository:
def __init__(self):
self.sessions: dict[int, StaffSession] = {}
self._next_id = 1
def create(self, *, staff_account_id: int, refresh_token_hash: str, expires_at: datetime, ip_address: str | None, user_agent: str | None) -> StaffSession:
session = StaffSession(
id=self._next_id,
staff_account_id=staff_account_id,
refresh_token_hash=refresh_token_hash,
expires_at=expires_at,
ip_address=ip_address,
user_agent=user_agent,
)
self.sessions[session.id] = session
self._next_id += 1
return session
def get_by_id(self, session_id: int) -> StaffSession | None:
return self.sessions.get(session_id)
def get_by_refresh_token_hash(self, refresh_token_hash: str) -> StaffSession | None:
for session in self.sessions.values():
if session.refresh_token_hash == refresh_token_hash:
return session
return None
def save(self, staff_session: StaffSession) -> StaffSession:
self.sessions[staff_session.id] = staff_session
return staff_session
class _FakeAuditLogRepository:
def __init__(self):
self.entries: list[AuditLog] = []
self._next_id = 1
def create(self, **kwargs) -> AuditLog:
audit_log = AuditLog(id=self._next_id, **kwargs)
self.entries.append(audit_log)
self._next_id += 1
return audit_log
def list_recent(self, limit: int = 50) -> list[AuditLog]:
return list(reversed(self.entries))[:limit]
class AdminAuthServiceTests(unittest.TestCase):
def setUp(self):
self.security_service = AdminSecurityService(
AdminSettings(
admin_auth_token_secret="test-secret",
admin_auth_password_pepper="pepper",
)
)
self.account = StaffAccount(
id=1,
email="admin@empresa.com",
display_name="Administrador",
password_hash=self.security_service.hash_password("SenhaMuitoSegura!123"),
role=StaffRole.DIRETOR,
is_active=True,
)
self.account_repository = _FakeStaffAccountRepository(self.account)
self.session_repository = _FakeStaffSessionRepository()
self.audit_repository = _FakeAuditLogRepository()
self.audit_service = AuditService(self.audit_repository)
self.auth_service = AuthService(
account_repository=self.account_repository,
session_repository=self.session_repository,
security_service=self.security_service,
audit_service=self.audit_service,
)
def test_login_creates_authenticated_session_with_refresh_token(self):
session = self.auth_service.login(
email="admin@empresa.com",
password="SenhaMuitoSegura!123",
ip_address="127.0.0.1",
user_agent="unittest",
)
self.assertIsNotNone(session)
self.assertEqual(session.session_id, 1)
self.assertTrue(session.access_token)
self.assertTrue(session.refresh_token)
self.assertEqual(session.principal.role, StaffRole.DIRETOR)
self.assertEqual(self.audit_repository.entries[-1].event_type, "staff.login.succeeded")
def test_login_failure_creates_audit_entry(self):
session = self.auth_service.login(
email="admin@empresa.com",
password="senha-incorreta",
ip_address="127.0.0.1",
user_agent="unittest",
)
self.assertIsNone(session)
self.assertEqual(self.audit_repository.entries[-1].event_type, "staff.login.failed")
self.assertEqual(self.audit_repository.entries[-1].outcome, "failed")
def test_refresh_session_rotates_refresh_token(self):
session = self.auth_service.login(
email="admin@empresa.com",
password="SenhaMuitoSegura!123",
ip_address="127.0.0.1",
user_agent="unittest",
)
refreshed = self.auth_service.refresh_session(
refresh_token=session.refresh_token,
ip_address="127.0.0.1",
user_agent="unittest-refresh",
)
self.assertIsNotNone(refreshed)
self.assertEqual(refreshed.session_id, session.session_id)
self.assertNotEqual(refreshed.refresh_token, session.refresh_token)
def test_logout_revokes_session_and_creates_audit_entry(self):
session = self.auth_service.login(
email="admin@empresa.com",
password="SenhaMuitoSegura!123",
ip_address="127.0.0.1",
user_agent="unittest",
)
self.assertTrue(
self.auth_service.logout(
session.session_id,
actor_staff_account_id=self.account.id,
ip_address="127.0.0.1",
user_agent="unittest",
)
)
self.assertIsNotNone(self.session_repository.get_by_id(session.session_id).revoked_at)
self.assertEqual(self.audit_repository.entries[-1].event_type, "staff.logout.succeeded")
def test_get_authenticated_context_rejects_revoked_session(self):
session = self.auth_service.login(
email="admin@empresa.com",
password="SenhaMuitoSegura!123",
ip_address="127.0.0.1",
user_agent="unittest",
)
self.auth_service.logout(
session.session_id,
actor_staff_account_id=self.account.id,
ip_address="127.0.0.1",
user_agent="unittest",
)
with self.assertRaises(ValueError):
self.auth_service.get_authenticated_context(session.access_token)
def test_get_authenticated_context_accepts_naive_session_expiry_from_database(self):
session = self.auth_service.login(
email="admin@empresa.com",
password="SenhaMuitoSegura!123",
ip_address="127.0.0.1",
user_agent="unittest",
)
stored_session = self.session_repository.get_by_id(session.session_id)
stored_session.expires_at = stored_session.expires_at.replace(tzinfo=None)
self.session_repository.save(stored_session)
context = self.auth_service.get_authenticated_context(session.access_token)
self.assertEqual(context.session_id, session.session_id)
self.assertEqual(context.principal.email, "admin@empresa.com")
def test_refresh_session_rejects_expired_session(self):
session = self.auth_service.login(
email="admin@empresa.com",
password="SenhaMuitoSegura!123",
ip_address="127.0.0.1",
user_agent="unittest",
)
stored_session = self.session_repository.get_by_id(session.session_id)
stored_session.expires_at = datetime.now(timezone.utc) - timedelta(minutes=1)
self.session_repository.save(stored_session)
refreshed = self.auth_service.refresh_session(
refresh_token=session.refresh_token,
ip_address="127.0.0.1",
user_agent="unittest-refresh",
)
self.assertIsNone(refreshed)
if __name__ == "__main__":
unittest.main()